In [1]:
# conda install -c conda-forge jupyterlab jupyter notebook spyder pandas matplotlib pillow -y
# pip install --no-cache-dir --force-reinstall opencv-python
📘 Python Tutorial – Part 1¶
✅ Topics Covered¶
- Basic Python syntax and usage
- Using NumPy for numerical operations
- Array creation and manipulation
- Random number generation
- Writing custom functions
- Function decorators (for execution time measurement)
- Thread-based parallelism with
ThreadPoolExecutor - Splitting workloads into chunks for parallel computation
- Vectorization vs threading performance comparison
- Acceleration with Numba JIT:
@njit→ compile Python to native machine code for speedprange→ parallelize loops across multiple CPU cores
- Saving variables efficiently (
pickle,joblib,np.save)
🛠️ Implemented Functions¶
make_data()→ Create random dataset with[x, y, m, b, eqn]compute_eqn_block()→ Compute equations on blocks of rowsfill_eqn_threaded()→ Parallel update of equation columnfill_eqn_numba()→ Numba-accelerated, parallelized computationtimeit_decorator()→ Measure execution time of functions
⚡ Key Concepts Practiced¶
- Difference between threads and cores in Python execution
- When threading can help (CPU-bound vs IO-bound workloads)
- Numba JIT compilation to speed up heavy numerical loops
- Using
prangeto exploit multiple cores automatically - Verifying parallel computation correctness
- Profiling performance of threaded vs vectorized vs JIT-compiled code
- Efficient ways to persist Python variables
📂 Files¶
python_tutorial_P1_V1.ipynb→ Jupyter Notebook exportpython_tutorial_P1_V1.html→ HTML export of tutorial session
print(f'abc {var}')¶
In [2]:
a=2
b=5
print('Here is a new line \n2nd line')
print('\nThe addition is',a+b, '-> non-standard')
print(f'\nThe addition is {a+b} -> standard way')
Here is a new line 2nd line The addition is 7 -> non-standard The addition is 7 -> standard way
In [3]:
name = 'Mamun'
friend = 'His name is '+ name
print(friend)
print('quotation : \"')
index = friend.index('m')
print(f'Index of m: {index}')
index = friend.index('Mamun')
print(f'Index of m: {index}')
new_friend = friend.replace('Mamun', 'Shah')
print(new_friend)
print(4%2)
print(11%2)
His name is Mamun quotation : " Index of m: 6 Index of m: 12 His name is Shah 0 1
In [4]:
print(dir(friend))
['__add__', '__class__', '__contains__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__getnewargs__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mod__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__rmod__', '__rmul__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'capitalize', 'casefold', 'center', 'count', 'encode', 'endswith', 'expandtabs', 'find', 'format', 'format_map', 'index', 'isalnum', 'isalpha', 'isascii', 'isdecimal', 'isdigit', 'isidentifier', 'islower', 'isnumeric', 'isprintable', 'isspace', 'istitle', 'isupper', 'join', 'ljust', 'lower', 'lstrip', 'maketrans', 'partition', 'removeprefix', 'removesuffix', 'replace', 'rfind', 'rindex', 'rjust', 'rpartition', 'rsplit', 'rstrip', 'split', 'splitlines', 'startswith', 'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill']
In [5]:
from math import ceil, floor, sqrt
print(f'ceil 3.7: {ceil(3.7)}')
print(f'floor 3.7: {floor(3.7)}')
print(f'sqrt 0.36: {sqrt(0.36)}\n')
print(2**4)
print(pow(2,4))
print(2**0.4)
print(pow(2,0.4))
print(int(3.4))
print(int(3.5))
print(int(3.99))
ceil 3.7: 4 floor 3.7: 3 sqrt 0.36: 0.6 16 16 1.3195079107728942 1.3195079107728942 3 3 3
In [6]:
number = input('Input Number')
print(f'Your number {number}')
name = input('Input Name')
print(f'Your Name {name}')
Your number 12
Your Name Shah
List []¶
In [7]:
data = [0,1,2,3,4]
print(f'Second Element: {data[1]}')
print('\n')
data.append(5)
print(f'data: {data}')
print('\n')
print(f'Last Element: {data[-1]}')
print('\n')
print(f'Data from index3(included) onward: {data[3:]}')
print(f'Data up to index3 (excluded): {data[:3]}')
print(f'Data index1 to index4(excluded) : {data[1:4]}')
list1 = ['a','b','c']
list2 = ['x','y','z']
list1.extend(list2)
print(f'\nlist1: {list1}\n')
Second Element: 1 data: [0, 1, 2, 3, 4, 5] Last Element: 5 Data from index3(included) onward: [3, 4, 5] Data up to index3 (excluded): [0, 1, 2] Data index1 to index4(excluded) : [1, 2, 3] list1: ['a', 'b', 'c', 'x', 'y', 'z']
In [8]:
numbers = [
[1,2,3], # row 0
[4,5,6], #row 1
[7,8,9]
]
numbers[1][1] # numbers row, col
Out[8]:
5
In [9]:
print(dir(list1))
['__add__', '__class__', '__class_getitem__', '__contains__', '__delattr__', '__delitem__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__getstate__', '__gt__', '__hash__', '__iadd__', '__imul__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__reversed__', '__rmul__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__subclasshook__', 'append', 'clear', 'copy', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 'reverse', 'sort']
Tuple () : (fixed, doesn't support item assignment, you can't change values)¶
In [10]:
d = (3,4)
print(d)
(3, 4)
Dictionary : {key : val}¶
In [11]:
price = {'fish' : 30, 'meat' : 50}
print(price)
dir(price)[-11:] # Last 11 valid attributes and methods
{'fish': 30, 'meat': 50}
Out[11]:
['clear', 'copy', 'fromkeys', 'get', 'items', 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values']
In [12]:
print(f'keys: {price.keys()}')
print(f'values: {price.values()}')
print(f'items: {price.items()}')
keys: dict_keys(['fish', 'meat'])
values: dict_values([30, 50])
items: dict_items([('fish', 30), ('meat', 50)])
In [13]:
fish = price['fish']
print(f'fish: {fish}')
meat = price.get('meat', 'default')
print(f'meat: {meat}')
laptop = price.get('laptop', 'default')
print(f'laptop: {laptop}')
tv = price.get('tv')
print(f'tv: {tv}')
fish: 30 meat: 50 laptop: default tv: None
In [14]:
try:
f = 4
print(f)
except:
print('error')
try:
temp0 = temp0+1
print(temp0)
except Exception as e:
print(e)
4 name 'temp0' is not defined
In [15]:
try:
num = int(input("Enter a number: "))
result = 10 / num
print(f"Result: {result}")
except ValueError:
print("Invalid input: Please enter a whole number.")
except ZeroDivisionError:
print("Error: Cannot divide by zero.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
else:
print("Division successful.")
finally:
print("Execution complete.")
Result: 0.8333333333333334 Division successful. Execution complete.
Open Close File¶
In [16]:
new_file = open('new_file.txt', 'a')
new_file.write ('This is python writing - Line 0')
new_file.write ('\nThis is python writing - Line 1')
new_file.write ('\nThis is python writing - Line 2')
new_file.write ('\nThis is python writing - Line 3')
new_file.write ('\nThis is python writing - Line 4')
new_file.write ('\nThis is python writing - Line 4')
new_file.close()
In [17]:
new_file = open('new_file.txt', 'r')
options = dir(new_file)
print(options[-26:])
new_file.close()
['buffer', 'close', 'closed', 'detach', 'encoding', 'errors', 'fileno', 'flush', 'isatty', 'line_buffering', 'mode', 'name', 'newlines', 'read', 'readable', 'readline', 'readlines', 'reconfigure', 'seek', 'seekable', 'tell', 'truncate', 'writable', 'write', 'write_through', 'writelines']
In [18]:
# Read entire file as a single string
with open('new_file.txt', 'r') as f:
data = f.read()
print(data)
This is python writing - Line 0 This is python writing - Line 1 This is python writing - Line 2 This is python writing - Line 3 This is python writing - Line 4 This is python writing - Line 4
In [19]:
# Read file into a list of lines
with open('new_file.txt', 'r') as f:
data = f.readlines()
print(data) # ["first line\n", "second line\n", ...]
['This is python writing - Line 0\n', 'This is python writing - Line 1\n', 'This is python writing - Line 2\n', 'This is python writing - Line 3\n', 'This is python writing - Line 4\n', 'This is python writing - Line 4']
numpy¶
In [20]:
# NumPy Tutorial 🚀
import numpy as np
# -------------------------------
# 1. Creating Arrays
# -------------------------------
print("1. Creating Arrays")
a = np.array([1, 2, 3]) # from Python list
b = np.array([[1, 2], [3, 4]]) # 2D array
c = np.zeros((2, 3)) # 2x3 of zeros
d = np.ones((3, 3)) # 3x3 of ones
e = np.eye(4) # 4x4 identity matrix
f = np.arange(0, 10, 2) # range: 0 to 10 step 2
g = np.linspace(0, 1, 5) # 5 points evenly spaced between 0 and 1
print("a:", a)
print("b:\n", b)
print("zeros:\n", c)
print("ones:\n", d)
print("identity:\n", e)
print("arange:", f)
print("linspace:", g)
1. Creating Arrays a: [1 2 3] b: [[1 2] [3 4]] zeros: [[0. 0. 0.] [0. 0. 0.]] ones: [[1. 1. 1.] [1. 1. 1.] [1. 1. 1.]] identity: [[1. 0. 0. 0.] [0. 1. 0. 0.] [0. 0. 1. 0.] [0. 0. 0. 1.]] arange: [0 2 4 6 8] linspace: [0. 0.25 0.5 0.75 1. ]
In [21]:
# -------------------------------
# 2. Array Properties
# -------------------------------
print("\n2. Array Properties")
print("Shape of b:", b.shape)
print("Data type of b:", b.dtype)
print("Number of elements in b:", b.size)
# -------------------------------
# 3. Indexing and Slicing
# -------------------------------
print("\n3. Indexing and Slicing")
print("b[0, 1] =", b[0, 1]) # element at row 0, col 1
print("Row 0 of b:", b[0, :])
print("Column 1 of b:", b[:, 1])
2. Array Properties Shape of b: (2, 2) Data type of b: int64 Number of elements in b: 4 3. Indexing and Slicing b[0, 1] = 2 Row 0 of b: [1 2] Column 1 of b: [2 4]
In [22]:
# -------------------------------
# 4. Math Operations
# -------------------------------
print("\n4. Math Operations")
x = np.array([1, 2, 3])
y = np.array([10, 20, 30])
print("x + y =", x + y)
print("x - y =", x - y)
print("x * y =", x * y) # elementwise multiply
print("x / y =", x / y)
print("x ** 2 =", x ** 2)
print("np.sqrt(x):", np.sqrt(x))
# -------------------------------
# 5. Matrix Operations
# -------------------------------
print("\n5. Matrix Operations")
m1 = np.array([[1, 2], [3, 4]])
m2 = np.array([[5, 6], [7, 8]])
print("m1 + m2:\n", m1 + m2)
print("m1 dot m2:\n", np.dot(m1, m2)) # matrix multiplication
print("Transpose of m1:\n", m1.T)
print("Inverse of m1:\n", np.linalg.inv(m1))
4. Math Operations x + y = [11 22 33] x - y = [ -9 -18 -27] x * y = [10 40 90] x / y = [0.1 0.1 0.1] x ** 2 = [1 4 9] np.sqrt(x): [1. 1.41421356 1.73205081] 5. Matrix Operations m1 + m2: [[ 6 8] [10 12]] m1 dot m2: [[19 22] [43 50]] Transpose of m1: [[1 3] [2 4]] Inverse of m1: [[-2. 1. ] [ 1.5 -0.5]]
In [23]:
# -------------------------------
# 6. Statistics
# -------------------------------
print("\n6. Statistics")
stats = np.array([[1, 2, 3], [4, 5, 6]])
print("Mean:", np.mean(stats))
print("Sum over columns:", np.sum(stats, axis=0))
print("Max over rows:", np.max(stats, axis=1))
# -------------------------------
# 7. Random Numbers
# -------------------------------
print("\n7. Random Numbers")
print("Random float [0,1):", np.random.rand())
print("Random 2x2:\n", np.random.rand(2, 2))
print("Random integers 0-9:", np.random.randint(0, 10, (3, 3)))
6. Statistics Mean: 3.5 Sum over columns: [5 7 9] Max over rows: [3 6] 7. Random Numbers Random float [0,1): 0.6620733259999424 Random 2x2: [[0.64429323 0.3909671 ] [0.22019211 0.47329826]] Random integers 0-9: [[0 5 8] [5 6 2] [5 9 5]]
In [24]:
# -------------------------------
# 8. Reshaping and Flattening
# -------------------------------
print("\n8. Reshaping and Flattening")
arr = np.arange(12) # 0..11
print("Original:", arr)
reshaped = arr.reshape(3, 4) # reshape to 3x4
print("Reshaped 3x4:\n", reshaped)
print("Flattened:", reshaped.flatten())
# -------------------------------
# 9. Boolean Masking
# -------------------------------
print("\n9. Boolean Masking")
data = np.array([10, 20, 30, 40, 50])
mask = data > 25
print("Mask:", mask)
print("Values > 25:", data[mask])
# -------------------------------
# 10. Broadcasting
# -------------------------------
print("\n10. Broadcasting")
mat = np.ones((3, 3))
vec = np.array([1, 2, 3])
print("mat:\n", mat)
print("vec:", vec)
print("mat + vec:\n", mat + vec) # vec added to each row
8. Reshaping and Flattening Original: [ 0 1 2 3 4 5 6 7 8 9 10 11] Reshaped 3x4: [[ 0 1 2 3] [ 4 5 6 7] [ 8 9 10 11]] Flattened: [ 0 1 2 3 4 5 6 7 8 9 10 11] 9. Boolean Masking Mask: [False False True True True] Values > 25: [30 40 50] 10. Broadcasting mat: [[1. 1. 1.] [1. 1. 1.] [1. 1. 1.]] vec: [1 2 3] mat + vec: [[2. 3. 4.] [2. 3. 4.] [2. 3. 4.]]
pandas¶
In [25]:
# Pandas Tutorial 📊
# conda install -c conda-forge pandas
import pandas as pd
import numpy as np
# -------------------------------
# 1. Creating Series
# -------------------------------
print("1. Creating Series")
s = pd.Series([10, 20, 30, 40], index=["a", "b", "c", "d"])
print(s)
print("Access element by label:", s["b"])
print("Access element by index:", s.iloc[2])
# -------------------------------
# 2. Creating DataFrames
# -------------------------------
print("\n2. Creating DataFrames")
data = {
"Name": ["Alice", "Bob", "Charlie", "David"],
"Age": [25, 30, 35, 40],
"City": ["NYC", "LA", "Chicago", "Houston"]
}
df = pd.DataFrame(data)
print(df)
# -------------------------------
# 7. Basic Statistics
# -------------------------------
print("\n7. Basic Statistics")
print("Mean age:", df["Age"].mean())
print("Summary:\n", df.describe())
# -------------------------------
# 8. Sorting
# -------------------------------
print("\n8. Sorting")
print("Sort by Age:\n", df.sort_values("Age", ascending=False))
# -------------------------------
# 9. GroupBy
# -------------------------------
print("\n9. GroupBy")
df2 = pd.DataFrame({
"Department": ["IT", "IT", "HR", "HR", "Finance"],
"Salary": [60000, 65000, 50000, 52000, 70000],
"Experience": [2, 3, 5, 7, 10]
})
print(df2)
print("Mean salary by department:\n", df2.groupby("Department")["Salary"].mean())
# -------------------------------
# 10. Merging / Joining
# -------------------------------
print("\n10. Merging / Joining")
employees = pd.DataFrame({
"Name": ["Alice", "Bob", "Charlie"],
"Dept": ["IT", "HR", "Finance"]
})
salaries = pd.DataFrame({
"Dept": ["IT", "HR", "Finance"],
"Salary": [60000, 50000, 70000]
})
merged = pd.merge(employees, salaries, on="Dept", how="left")
print(merged)
# -------------------------------
# 11. Handling Missing Data
# -------------------------------
print("\n11. Handling Missing Data")
df3 = pd.DataFrame({
"A": [1, 2, np.nan, 4],
"B": [5, np.nan, np.nan, 8]
})
print("Original:\n", df3)
print("Drop rows with NaN:\n", df3.dropna())
print("Fill NaN with 0:\n", df3.fillna(0))
print("Fill NaN with column mean:\n", df3.fillna(df3.mean(numeric_only=True)))
# -------------------------------
# 12. Reading / Writing CSV
# -------------------------------
print("\n12. Reading / Writing CSV")
# df.to_csv("example.csv", index=False) # save
# loaded = pd.read_csv("example.csv") # load
# print(loaded)
print("Use df.to_csv(...) to write and pd.read_csv(...) to read")
1. Creating Series
a 10
b 20
c 30
d 40
dtype: int64
Access element by label: 20
Access element by index: 30
2. Creating DataFrames
Name Age City
0 Alice 25 NYC
1 Bob 30 LA
2 Charlie 35 Chicago
3 David 40 Houston
7. Basic Statistics
Mean age: 32.5
Summary:
Age
count 4.000000
mean 32.500000
std 6.454972
min 25.000000
25% 28.750000
50% 32.500000
75% 36.250000
max 40.000000
8. Sorting
Sort by Age:
Name Age City
3 David 40 Houston
2 Charlie 35 Chicago
1 Bob 30 LA
0 Alice 25 NYC
9. GroupBy
Department Salary Experience
0 IT 60000 2
1 IT 65000 3
2 HR 50000 5
3 HR 52000 7
4 Finance 70000 10
Mean salary by department:
Department
Finance 70000.0
HR 51000.0
IT 62500.0
Name: Salary, dtype: float64
10. Merging / Joining
Name Dept Salary
0 Alice IT 60000
1 Bob HR 50000
2 Charlie Finance 70000
11. Handling Missing Data
Original:
A B
0 1.0 5.0
1 2.0 NaN
2 NaN NaN
3 4.0 8.0
Drop rows with NaN:
A B
0 1.0 5.0
3 4.0 8.0
Fill NaN with 0:
A B
0 1.0 5.0
1 2.0 0.0
2 0.0 0.0
3 4.0 8.0
Fill NaN with column mean:
A B
0 1.000000 5.0
1 2.000000 6.5
2 2.333333 6.5
3 4.000000 8.0
12. Reading / Writing CSV
Use df.to_csv(...) to write and pd.read_csv(...) to read
In [26]:
# -------------------------------
# 3. Inspecting Data
# -------------------------------
print("\n3. Inspecting Data")
print("Head:\n", df.head(2))
print("Shape:", df.shape)
print("Columns:", df.columns)
print("Info:")
print(df.info())
# -------------------------------
# 4. Selecting Data
# -------------------------------
print("\n4. Selecting Data")
print("Single column:\n", df["Name"])
print("Multiple columns:\n", df[["Name", "City"]])
print("Row by label:\n", df.loc[1])
print("Row by position:\n", df.iloc[2])
3. Inspecting Data
Head:
Name Age City
0 Alice 25 NYC
1 Bob 30 LA
Shape: (4, 3)
Columns: Index(['Name', 'Age', 'City'], dtype='object')
Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 3 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 Name 4 non-null object
1 Age 4 non-null int64
2 City 4 non-null object
dtypes: int64(1), object(2)
memory usage: 228.0+ bytes
None
4. Selecting Data
Single column:
0 Alice
1 Bob
2 Charlie
3 David
Name: Name, dtype: object
Multiple columns:
Name City
0 Alice NYC
1 Bob LA
2 Charlie Chicago
3 David Houston
Row by label:
Name Bob
Age 30
City LA
Name: 1, dtype: object
Row by position:
Name Charlie
Age 35
City Chicago
Name: 2, dtype: object
In [27]:
# -------------------------------
# 5. Filtering Rows
# -------------------------------
print("\n5. Filtering Rows")
print("Age > 30:\n", df[df["Age"] > 30])
print("City == LA:\n", df[df["City"] == "LA"])
# -------------------------------
# 6. Adding / Modifying Columns
# -------------------------------
print("\n6. Adding / Modifying Columns")
df["Age in 5 years"] = df["Age"] + 5
df["Senior"] = df["Age"] > 35
print(df)
5. Filtering Rows
Age > 30:
Name Age City
2 Charlie 35 Chicago
3 David 40 Houston
City == LA:
Name Age City
1 Bob 30 LA
6. Adding / Modifying Columns
Name Age City Age in 5 years Senior
0 Alice 25 NYC 30 False
1 Bob 30 LA 35 False
2 Charlie 35 Chicago 40 False
3 David 40 Houston 45 True
function¶
In [28]:
# Python Functions Tutorial ⚡
# -------------------------------
# 1. Basic Function
# -------------------------------
def greet():
print("Hello, world!")
greet()
# -------------------------------
# 2. Function with Parameters
# -------------------------------
def greet_person(name):
print(f"Hello, {name}!")
greet_person("Alice")
greet_person("Bob")
Hello, world! Hello, Alice! Hello, Bob!
In [29]:
# -------------------------------
# 3. Function with Return Value
# -------------------------------
def square(x):
return x * x
result = square(5)
print("Square of 5:", result)
# -------------------------------
# 4. Multiple Parameters
# -------------------------------
def add(a, b):
return a + b
print("2 + 3 =", add(2, 3))
# -------------------------------
# 5. Default Arguments
# -------------------------------
def power(base, exp=2):
return base ** exp
print("power(3):", power(3)) # uses default exp=2
print("power(3, 3):", power(3, 3))
Square of 5: 25 2 + 3 = 5 power(3): 9 power(3, 3): 27
In [30]:
# -------------------------------
# 6. Keyword Arguments
# -------------------------------
def introduce(name, age, city):
print(f"My name is {name}, I'm {age}, and I live in {city}.")
introduce(age=25, name="Charlie", city="NYC")
# -------------------------------
# 7. Variable Number of Arguments (*args)
# -------------------------------
def sum_all(*args):
return sum(args)
print("sum_all(1,2,3,4,5):", sum_all(1, 2, 3, 4, 5))
# -------------------------------
# 8. Variable Keyword Arguments (**kwargs)
# -------------------------------
def profile(**kwargs):
for key, value in kwargs.items():
print(f"{key}: {value}")
profile(name="David", age=30, city="LA")
My name is Charlie, I'm 25, and I live in NYC. sum_all(1,2,3,4,5): 15 name: David age: 30 city: LA
In [31]:
# -------------------------------
# 9. Lambda (Anonymous Functions)
# -------------------------------
double = lambda x: x * 2
print("double(7):", double(7))
# Use with map/filter
nums = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x**2, nums))
even = list(filter(lambda x: x % 2 == 0, nums))
print("Squared:", squared)
print("Even:", even)
double(7): 14 Squared: [1, 4, 9, 16, 25] Even: [2, 4]
In [32]:
# -------------------------------
# 10. Functions Inside Functions
# -------------------------------
def outer_function(text):
def inner_function():
return text.upper()
return inner_function()
print("Inner function result:", outer_function("hello"))
Inner function result: HELLO
In [33]:
# -------------------------------
# 11. Functions as Arguments
# -------------------------------
def apply(func, value):
return func(value)
print("apply(square, 6):", apply(square, 6))
# -------------------------------
# 12. Recursive Function
# -------------------------------
def factorial(n):
if n == 0 or n == 1:
return 1
return n * factorial(n - 1)
print("Factorial of 5:", factorial(5))
# -------------------------------
# 13. Docstrings
# -------------------------------
def divide(a, b):
"""
Divides two numbers.
Parameters:
a (int/float): numerator
b (int/float): denominator
Returns:
float: result of division
"""
return a / b
print("divide(10,2):", divide(10, 2))
print("Help on divide:")
help(divide)
apply(square, 6): 36
Factorial of 5: 120
divide(10,2): 5.0
Help on divide:
Help on function divide in module __main__:
divide(a, b)
Divides two numbers.
Parameters:
a (int/float): numerator
b (int/float): denominator
Returns:
float: result of division
Using *args (variable positional arguments)¶
In [34]:
def add_numbers(*args):
print("args as tuple:", args)
return sum(args)
print(add_numbers(1, 2, 3))
print(add_numbers(10, 20, 30, 40))
args as tuple: (1, 2, 3) 6 args as tuple: (10, 20, 30, 40) 100
In [35]:
#### Using **kwargs (variable keyword arguments)
In [36]:
def print_profile(**kwargs):
print("kwargs as dict:", kwargs)
for key, value in kwargs.items():
print(f"{key}: {value}")
print_profile(name="Alice", age=25, city="NYC")
kwargs as dict: {'name': 'Alice', 'age': 25, 'city': 'NYC'}
name: Alice
age: 25
city: NYC
Combining *args and **kwargs¶
In [37]:
def demo_func(*args, **kwargs):
print("args:", args)
print("kwargs:", kwargs)
demo_func(1, 2, 3, name="Bob", age=30)
args: (1, 2, 3)
kwargs: {'name': 'Bob', 'age': 30}
Forwarding Arguments¶
In [38]:
def greet(greeting, name):
print(f"{greeting}, {name}!")
def wrapper(*args, **kwargs):
print("Wrapper adds logging...")
return greet(*args, **kwargs)
wrapper("Hello", name="Charlie")
Wrapper adds logging... Hello, Charlie!
Mixing with Normal Arguments¶
In [39]:
def order(item, qty=1, *args, **kwargs):
print(f"Item: {item}, Qty: {qty}")
print("Extra positional:", args)
print("Extra keyword:", kwargs)
order("Book", 2, "Express shipping", "Gift wrap", address="NYC", discount=0.1)
Item: Book, Qty: 2
Extra positional: ('Express shipping', 'Gift wrap')
Extra keyword: {'address': 'NYC', 'discount': 0.1}
Fixing arguments for math functions¶
In [40]:
from functools import partial
import math
# Normal pow(base, exp)
print(pow(2, 3)) # 8
print('_'*25)
# Create a "square" function by fixing exp=2
square = partial(pow, exp=2)
print(square(5)) # 25
print('_'*25)
# Cube function
cube = partial(pow, exp=3)
print(cube(4)) # 64
print('_'*25)
# Using with higher-order functions
def multiply(x, y):
return x * y
double = partial(multiply, 2)
triple = partial(multiply, 3)
print(double(5)) # 10
print(triple(5)) # 15
print('_'*25)
# Simplifying function calls with many parameters
def order(item, qty, discount, tax):
return f"{qty}x {item} with {discount*100:.0f}% discount and {tax*100:.0f}% tax"
# Fix tax and discount
order_with_tax = partial(order, discount=0.1, tax=0.07)
print(order_with_tax("Book", 3))
print(order_with_tax("Laptop", 1))
8 _________________________ 25 _________________________ 64 _________________________ 10 15 _________________________ 3x Book with 10% discount and 7% tax 1x Laptop with 10% discount and 7% tax
Pre-configuring built-in functions¶
In [41]:
from functools import partial
print_with_comma = partial(print, sep=", ")
print_with_comma("Alice", "Bob", "Charlie")
Alice, Bob, Charlie
class¶
In [42]:
# Python Classes Tutorial 🏗️
# -------------------------------
# 1. A Basic Class
# -------------------------------
class Person:
pass
p1 = Person()
print("Created a Person object:", p1)
# -------------------------------
# 2. Adding Attributes
# -------------------------------
class Person:
def __init__(self, name, age):
self.name = name # instance attribute
self.age = age
p2 = Person("Alice", 25)
print(f"Name: {p2.name}, Age: {p2.age}")
Created a Person object: <__main__.Person object at 0x000001FAB9E5B590> Name: Alice, Age: 25
In [43]:
# -------------------------------
# 3. Adding Methods
# -------------------------------
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def greet(self):
return f"Hello, I'm {self.name} and I'm {self.age} years old."
p3 = Person("Bob", 30)
print(p3.greet())
# -------------------------------
# 4. Class Attributes vs Instance Attributes
# -------------------------------
class Dog:
species = "Canis familiaris" # class attribute
def __init__(self, name):
self.name = name # instance attribute
dog1 = Dog("Fido")
dog2 = Dog("Buddy")
print(dog1.name, "-", dog1.species)
print(dog2.name, "-", dog2.species)
Hello, I'm Bob and I'm 30 years old. Fido - Canis familiaris Buddy - Canis familiaris
In [44]:
# -------------------------------
# 5. Special Methods (__str__, __len__, etc.)
# -------------------------------
class Book:
def __init__(self, title, pages):
self.title = title
self.pages = pages
def __str__(self):
return f"Book: {self.title} ({self.pages} pages)"
def __len__(self):
return self.pages
b1 = Book("Python Basics", 250)
print(str(b1))
print("Length of book:", len(b1))
Book: Python Basics (250 pages) Length of book: 250
In [45]:
# -------------------------------
# 6. Encapsulation (Private variables)
# -------------------------------
class BankAccount:
def __init__(self, owner, balance=0):
self.owner = owner
self.__balance = balance # private attribute (convention: __var)
def deposit(self, amount):
self.__balance += amount
return self.__balance
def withdraw(self, amount):
if amount <= self.__balance:
self.__balance -= amount
return self.__balance
else:
return "Insufficient funds"
def get_balance(self):
return self.__balance
acct = BankAccount("Charlie", 100)
print(acct.deposit(50))
print(acct.withdraw(30))
print("Balance:", acct.get_balance())
150 120 Balance: 120
In [46]:
# -------------------------------
# 7. Inheritance
# -------------------------------
class Animal:
def __init__(self, name):
self.name = name
def speak(self):
return "..."
class Dog(Animal):
def speak(self):
return "Woof!"
class Cat(Animal):
def speak(self):
return "Meow!"
animals = [Dog("Fido"), Cat("Whiskers")]
for a in animals:
print(a.name, "says", a.speak())
Fido says Woof! Whiskers says Meow!
In [47]:
# -------------------------------
# 8. Multiple Inheritance
# -------------------------------
class Flyer:
def fly(self):
return "I can fly!"
class Swimmer:
def swim(self):
return "I can swim!"
class Duck(Flyer, Swimmer):
def sound(self):
return "Quack!"
d = Duck()
print(d.fly(), d.swim(), d.sound())
I can fly! I can swim! Quack!
In [48]:
# -------------------------------
# 9. Class Methods and Static Methods
# -------------------------------
class MathUtils:
@staticmethod
def add(a, b):
return a + b
@classmethod
def description(cls):
return f"This is {cls.__name__}, providing math utilities."
print(MathUtils.add(2, 3))
print(MathUtils.description())
5 This is MathUtils, providing math utilities.
In [49]:
# -------------------------------
# 10. Properties (getter/setter)
# -------------------------------
class Celsius:
def __init__(self, temperature=0):
self._temperature = temperature
@property #getter
def temperature(self):
return self._temperature
@temperature.setter
def temperature(self, value):
if value < -273.15:
raise ValueError("Temperature below -273.15 is not possible")
self._temperature = value
c = Celsius()
c.temperature = 25
print("Temp in Celsius:", c.temperature)
# c.temperature = -300 # will raise ValueError
Temp in Celsius: 25
In [50]:
dir(c)[-2:]
Out[50]:
['_temperature', 'temperature']
Update property of a class¶
In [51]:
c = Celsius()
def temperature_new(self, value):
if value < -100:
print("Temperature below -100 is too much cold")
self._temperature = value
In [52]:
Celsius.temperature = Celsius.temperature.setter(temperature_new)
c.temperature = -101
Temperature below -100 is too much cold
Update function of a class¶
In [53]:
class Car:
def start(self):
print("Starting the car...")
# Create two instances
car1 = Car()
car2 = Car()
# Original behavior
car1.start() # Output: Starting the car...
car2.start() # Output: Starting the car...
# --- Define a new method (standalone function) ---
def start_new(self):
print("Car starts silently with electric power ⚡")
# Replace the method *only* for car1 using __get__
car1.start = start_new.__get__(car1, Car)
# Test behavior again
car1.start() # Output: Car starts silently with electric power ⚡
car2.start() # Output: Starting the car...
Starting the car... Starting the car... Car starts silently with electric power ⚡ Starting the car...
Decorator¶
Basic Decorator¶
In [54]:
def my_decorator(func):
def wrapper():
print("Before function runs")
func()
print("After function runs")
return wrapper
@my_decorator
def say_hello():
print("Hello!")
say_hello()
Before function runs Hello! After function runs
Decorator with Arguments¶
In [55]:
def repeat(n):
def decorator(func):
def wrapper(*args, **kwargs):
for _ in range(n):
func(*args, **kwargs)
return wrapper
return decorator
@repeat(3)
def greet(name):
print(f"Hello, {name}!")
greet("Alice")
Hello, Alice! Hello, Alice! Hello, Alice!
Preserving Function Metadata (functools.wraps)¶
In [56]:
from functools import wraps
def debug(func):
@wraps(func)
def wrapper(*args, **kwargs):
print(f"Calling {func.__name__} with args={args}, kwargs={kwargs}")
return func(*args, **kwargs)
return wrapper
@debug
def add(a, b):
"""Add two numbers"""
return a + b
print(add(2, 3))
print(add.__name__) # preserved: "add"
print(add.__doc__) # preserved: "Add two numbers"
Calling add with args=(2, 3), kwargs={}
5
add
Add two numbers
Real-World Example: Timing a Function¶
In [57]:
import time
from functools import wraps
def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} ran in {end - start:.4f} seconds")
return result
return wrapper
@timer
def slow_function():
time.sleep(2)
return "Done!"
print(slow_function())
slow_function ran in 2.0012 seconds Done!
Real-World Example: Authentication Check¶
In [58]:
def require_admin(func):
@wraps(func)
def wrapper(user, *args, **kwargs):
if user != "admin":
print("Access denied!")
return None
return func(user, *args, **kwargs)
return wrapper
@require_admin
def delete_user(user, target):
print(f"{user} deleted {target}")
delete_user("guest", "Alice") # Access denied
delete_user("admin", "Bob") # Works
Access denied! admin deleted Bob
Stacking Multiple Decorators¶
In [59]:
def bold(func):
@wraps(func)
def wrapper():
return "<b>" + func() + "</b>"
return wrapper
def italic(func):
@wraps(func)
def wrapper():
return "<i>" + func() + "</i>"
return wrapper
@bold
@italic
def text():
return "Hello"
print(text()) # <b><i>Hello</i></b>
<b><i>Hello</i></b>
The Problem Without @wraps¶
Example without @wraps¶
In [60]:
def debug(func):
def wrapper(*args, **kwargs):
print(f"Calling {func.__name__} with {args}")
return func(*args, **kwargs)
return wrapper
@debug
def add(a, b):
"""Add two numbers"""
return a + b
print(add(2, 3))
print(add.__name__) # OOPS
print(add.__doc__) # OOPS
Calling add with (2, 3) 5 wrapper None
Example with @wraps¶
In [61]:
from functools import wraps
def debug(func):
@wraps(func)
def wrapper(*args, **kwargs):
print(f"Calling {func.__name__} with {args}")
return func(*args, **kwargs)
return wrapper
@debug
def add(a, b):
"""Add two numbers"""
return a + b
print(add(2, 3))
print(add.__name__) # ✅ "add"
print(add.__doc__) # ✅ "Add two numbers"
Calling add with (2, 3) 5 add Add two numbers
Example: Same Decorator on Two Functions¶
In [62]:
from functools import wraps
def debug(func):
@wraps(func)
def wrapper(*args, **kwargs):
print(f"Calling {func.__name__} with args={args}, kwargs={kwargs}")
result = func(*args, **kwargs)
print(f"{func.__name__} returned {result}")
return result
return wrapper
@debug
def add(a, b):
return a + b
@debug
def multiply(a, b):
return a * b
print(add(3, 4))
print(multiply(3, 4))
Calling add with args=(3, 4), kwargs={}
add returned 7
7
Calling multiply with args=(3, 4), kwargs={}
multiply returned 12
12
Another Example: Timing Any Function¶
In [63]:
import time
from functools import wraps
def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} took {end - start:.4f} seconds")
return result
return wrapper
@timer
def slow_square(n):
time.sleep(1)
return n * n
@timer
def slow_sum(n):
time.sleep(2)
return sum(range(n))
print(slow_square(5))
print(slow_sum(1000000))
slow_square took 1.0013 seconds 25 slow_sum took 2.0022 seconds 499999500000
if elif else¶
In [64]:
x = 10
if x > 5:
print("x is greater than 5")
print('_'*25)
x = 3
if x % 2 == 0:
print("x is even")
else:
print("x is odd")
print('_'*25)
score = 85
if score >= 90:
print("Grade: A")
elif score >= 80:
print("Grade: B")
elif score >= 70:
print("Grade: C")
else:
print("Grade: F")
x is greater than 5 _________________________ x is odd _________________________ Grade: B
Nested if¶
In [65]:
x = 15
if x > 0:
if x % 2 == 0:
print("x is positive and even")
else:
print("x is positive and odd")
else:
print("x is not positive")
x is positive and odd
Multiple Conditions (and, or, not)¶
In [66]:
age = 20
has_id = True
if age >= 18 and has_id:
print("Access granted")
else:
print("Access denied")
Access granted
Ternary Expression (one-line if/else)¶
In [67]:
x = 7
result = "Even" if x % 2 == 0 else "Odd"
print(result)
Odd
Real Example: Login System¶
In [68]:
username = "admin"
password = "1234"
if username == "admin" and password == "1234":
print("Login successful ✅")
elif username == "admin":
print("Wrong password ❌")
else:
print("Unknown user 🚫")
Login successful ✅
Real Example: Traffic Light¶
In [69]:
light = "yellow"
if light == "green":
print("Go 🚗")
elif light == "yellow":
print("Slow down ⚠️")
elif light == "red":
print("Stop 🛑")
else:
print("Invalid signal ❓")
Slow down ⚠️
Operators ==, !=, <=, >=, <, >¶
In [70]:
a, b = 5, 3
print(a == b) # False
print(a != b) # True
print(5 > 3) # True
print(5 < 3) # False
print(5 >= 5) # True
print(3 <= 5) # True
False True True False True True
In [71]:
age = 20
has_id = True
if age >= 18 and has_id:
print("Allowed ✅")
else:
print("Denied ❌")
Allowed ✅
In [72]:
day = "Sunday"
if day == "Saturday" or day == "Sunday":
print("Weekend 🎉")
else:
print("Weekday 💼")
Weekend 🎉
In [73]:
logged_in = False
if not logged_in:
print("Please log in 🔑")
else:
print("Welcome back 👋")
Please log in 🔑
In [74]:
x = 15
if x > 10 and x < 20:
print("x is between 10 and 20")
if x < 5 or x > 12:
print("x is either less than 5 or greater than 12")
if not (x == 15):
print("x is not 15")
else:
print("x is 15 ✅")
x is between 10 and 20 x is either less than 5 or greater than 12 x is 15 ✅
for loop¶
In [75]:
fruits = ["apple", "banana", "cherry"]
for fruit in fruits:
print(fruit)
apple banana cherry
In [76]:
for letter in "hello":
print(letter)
h e l l o
In [77]:
for i in range(5): # 0 to 4
print(i)
for i in range(2, 6): # 2 to 5
print(i)
for i in range(0, 10, 2): # step of 2
print(i)
0 1 2 3 4 2 3 4 5 0 2 4 6 8
In [78]:
# Looping with enumerate()
colors = ["red", "green", "blue"]
for index, color in enumerate(colors):
print(index, color)
0 red 1 green 2 blue
In [79]:
# Looping over a dictionary
person = {"name": "Alice", "age": 25, "city": "NYC"}
for key, value in person.items():
print(key, ":", value)
name : Alice age : 25 city : NYC
In [80]:
# Nested loops
for i in range(1, 4):
for j in range(1, 4):
print(f"i={i}, j={j}")
i=1, j=1 i=1, j=2 i=1, j=3 i=2, j=1 i=2, j=2 i=2, j=3 i=3, j=1 i=3, j=2 i=3, j=3
In [81]:
# Using break and continue
nums = [1, 2, 3, 4, 5]
for n in nums:
if n == 3:
continue # skip 3
if n == 5:
break # stop at 5
print(n)
1 2 4
In [82]:
# Loop with else
for n in range(5):
print(n)
else:
print("Loop finished without break")
# The else block runs only if the loop does not end with break
0 1 2 3 4 Loop finished without break
List Comprehension (short for loop)¶
In [83]:
squares = [x**2 for x in range(5)]
print(squares)
print('_'*25)
evens = [x for x in range(10) if x % 2 == 0]
print(evens)
[0, 1, 4, 9, 16] _________________________ [0, 2, 4, 6, 8]
while loop¶
In [84]:
# Infinite loop (⚠️ careful!)
# while True:
# print("This runs forever!")
In [85]:
x = 1
while True:
print(x)
if x == 3:
break # exit loop
x += 1
1 2 3
In [86]:
n = 0
while n < 5:
n += 1
if n == 3:
continue # skip printing 3
print(n)
1 2 4 5
In [87]:
i = 1
while i <= 3:
print("Loop", i)
i += 1
else:
print("Loop finished (no break)")
Loop 1 Loop 2 Loop 3 Loop finished (no break)
In [88]:
n = 1
while n <= 5:
print(f"5 x {n} = {5*n}")
n += 1
5 x 1 = 5 5 x 2 = 10 5 x 3 = 15 5 x 4 = 20 5 x 5 = 25
print with colors¶
Using ANSI Escape Codes (built-in)¶
In [89]:
# Format: \033[<style>;<text_color>;<background_color>m
print("\033[31mThis is Red Text\033[0m")
print("\033[32mThis is Green Text\033[0m")
print("\033[33mThis is Yellow Text\033[0m")
print("\033[34mThis is Blue Text\033[0m")
print("\033[35mThis is Magenta Text\033[0m")
print("\033[36mThis is Cyan Text\033[0m")
print("\033[37mThis is White Text\033[0m")
This is Red Text This is Green Text This is Yellow Text This is Blue Text This is Magenta Text This is Cyan Text This is White Text
Add Styles¶
In [90]:
print("\033[1;32mBold Green\033[0m")
print("\033[4;34mUnderlined Blue\033[0m")
print("\033[7;31mInverted Red\033[0m")
Bold Green Underlined Blue Inverted Red
With Background Colors¶
In [91]:
print("\033[30;47mBlack text on White background\033[0m")
print("\033[33;44mYellow text on Blue background\033[0m")
Black text on White background Yellow text on Blue background
Using colorama (Windows friendly 🌐)- (terminal only)¶
In [92]:
#pip install colorama)
from colorama import Fore, Back, Style, init
init(autoreset=True) # auto reset after each print
print(Fore.RED + "Red text")
print(Fore.GREEN + "Green text")
print(Back.YELLOW + "With Yellow Background")
print(Style.BRIGHT + Fore.BLUE + "Bright Blue text")
Red text Green text With Yellow Background Bright Blue text
Using termcolor - (terminal only)¶
In [93]:
# pip install termcolor
from termcolor import colored
print(colored("Hello in Red", "red"))
print(colored("Green on Yellow", "green", "on_yellow"))
print(colored("Bold Blue", "blue", attrs=["bold"]))
Hello in Red Green on Yellow Bold Blue
Use IPython display + HTML in Jupyter¶
In [94]:
from IPython.display import HTML, display
def cprint(text, color="red", bgcolor=None, bold=False):
style = f"color:{color};"
if bgcolor:
style += f"background-color:{bgcolor};"
if bold:
style += "font-weight:bold;"
display(HTML(f"<span style='{style}'>{text}</span>"))
cprint("Hello Red", color="red")
cprint("Green on Yellow", color="green", bgcolor="yellow")
cprint("Bold Blue", color="blue", bold=True)
Hello Red
Green on Yellow
Bold Blue
Use rich (best for modern notebooks & terminals) **************¶
In [95]:
# pip install rich
from rich import print as rprint
rprint('No color', end=" end ")
rprint("[red]Hello in Red[/red]", end=" ")
rprint("[green on yellow]Green on Yellow[/green on yellow]")
rprint("[bold blue]Bold Blue[/bold blue]")
No color end
Hello in Red
Green on Yellow
Bold Blue
Dynamic Programming¶
In [96]:
def fib(n, memo={}):
if n in memo:
return memo[n]
if n <= 1:
return n
memo[n] = fib(n-1, memo) + fib(n-2, memo)
return memo[n]
print([fib(i) for i in range(10)]) # First 10 Fibonacci numbers
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
In [97]:
def fib_tab(n):
dp = [0, 1] + [0]*(n-1)
for i in range(2, n+1):
dp[i] = dp[i-1] + dp[i-2]
return dp[n]
print(fib_tab(100)) # 55
354224848179261915075
In [98]:
def climb_stairs(n):
if n <= 2:
return n
dp = [0]*(n+1)
dp[1], dp[2] = 1, 2
for i in range(3, n+1):
dp[i] = dp[i-1] + dp[i-2]
return dp[n]
print(climb_stairs(5)) # 8
8
In [99]:
def coin_change(coins, amount):
dp = [float("inf")] * (amount+1)
dp[0] = 0
for coin in coins:
for x in range(coin, amount+1):
dp[x] = min(dp[x], dp[x-coin] + 1)
return dp[amount] if dp[amount] != float("inf") else -1
print(coin_change([1,2,5], 11)) # 3 (5+5+1)
3
🔹 Longest Common Subsequence (LCS)¶
In [100]:
def lcs(s1, s2):
m, n = len(s1), len(s2)
dp = [[0]*(n+1) for _ in range(m+1)]
for i in range(m-1, -1, -1):
for j in range(n-1, -1, -1):
if s1[i] == s2[j]:
dp[i][j] = 1 + dp[i+1][j+1]
else:
dp[i][j] = max(dp[i+1][j], dp[i][j+1])
return dp[0][0]
print(lcs("abcde", "ace")) # 3 ("ace")
3
In [101]:
# 🔹 0/1 Knapsack
In [102]:
def knapsack(weights, values, W):
n = len(values)
dp = [[0]*(W+1) for _ in range(n+1)]
for i in range(1, n+1):
for w in range(1, W+1):
if weights[i-1] <= w:
dp[i][w] = max(dp[i-1][w], values[i-1] + dp[i-1][w-weights[i-1]])
else:
dp[i][w] = dp[i-1][w]
return dp[n][W]
print(knapsack([1,3,4], [15,20,30], 4)) # 35
35
Algorithms¶
In [103]:
def bellman_ford(graph, V, src):
# graph = [(u, v, w), ...]
dist = [float("inf")] * V
dist[src] = 0
# Relax edges V-1 times
for _ in range(V-1):
for u, v, w in graph:
if dist[u] != float("inf") and dist[u] + w < dist[v]:
dist[v] = dist[u] + w
# Detect negative cycles
for u, v, w in graph:
if dist[u] != float("inf") and dist[u] + w < dist[v]:
print("Graph contains negative weight cycle!")
return None
return dist
# Example graph
edges = [
(0, 1, -1), (0, 2, 4),
(1, 2, 3), (1, 3, 2), (1, 4, 2),
(3, 2, 5), (3, 1, 1), (4, 3, -3)
]
print(bellman_ford(edges, 5, 0)) # [0, -1, 2, -2, 1]
[0, -1, 2, -2, 1]
In [104]:
import heapq
def dijkstra(graph, src):
# graph as adjacency list: {u: [(v, w), ...]}
dist = {node: float("inf") for node in graph}
dist[src] = 0
pq = [(0, src)] # (distance, node)
while pq:
d, u = heapq.heappop(pq)
if d > dist[u]:
continue
for v, w in graph[u]:
if dist[u] + w < dist[v]:
dist[v] = dist[u] + w
heapq.heappush(pq, (dist[v], v))
return dist
graph = {
0: [(1, 4), (2, 1)],
1: [(3, 1)],
2: [(1, 2), (3, 5)],
3: []
}
print(dijkstra(graph, 0)) # {0: 0, 1: 3, 2: 1, 3: 4}
{0: 0, 1: 3, 2: 1, 3: 4}
In [105]:
def floyd_warshall(matrix):
V = len(matrix)
dist = [row[:] for row in matrix] # copy
for k in range(V):
for i in range(V):
for j in range(V):
if dist[i][k] + dist[k][j] < dist[i][j]:
dist[i][j] = dist[i][k] + dist[k][j]
return dist
INF = float("inf")
graph = [
[0, 5, INF, 10],
[INF, 0, 3, INF],
[INF, INF, 0, 1],
[INF, INF, INF, 0]
]
for row in floyd_warshall(graph):
print(row)
[0, 5, 8, 9] [inf, 0, 3, 4] [inf, inf, 0, 1] [inf, inf, inf, 0]
pickle.dump(data, f), pickle.load(f)¶
In [106]:
import pickle
data = {"name": "Alice", "age": 25, "is_student": False}
# Save object to file
with open("data.pkl", "wb") as f:
pickle.dump(data, f)
# Load object back
with open("data.pkl", "rb") as f:
loaded_data = pickle.load(f)
print("Loaded:", loaded_data)
Loaded: {'name': 'Alice', 'age': 25, 'is_student': False}
pickle.dump(data, f), pickle.load(f) inside a folder¶
In [107]:
import os
import pickle
# Data to pickle
data = {"name": "Alice", "age": 25}
# Folder and file path
folder = "saved_pickles"
file_path = os.path.join(folder, "data.pkl")
# 1. Create folder if it doesn't exist
if not os.path.exists(folder):
os.makedirs(folder) # creates folder
# 2. Save pickle inside that folder
with open(file_path, "wb") as f:
pickle.dump(data, f)
print(f"Pickle saved at {file_path}")
# 3. Load it back
with open(file_path, "rb") as f:
loaded = pickle.load(f)
print("Loaded object:", loaded)
Pickle saved at saved_pickles\data.pkl
Loaded object: {'name': 'Alice', 'age': 25}
In [108]:
pickle_folder = r'saved_pickles'
pickle_file = os.path.join(pickle_folder, 'data.pkl')
Get file names inside a folder¶
In [109]:
import os
pickle_folder = r"saved_pickles"
files = os.listdir(pickle_folder) # list all files & folders
print(files)
['data.pkl']
In [110]:
files = [os.path.join(pickle_folder, f) for f in os.listdir(pickle_folder)]
print(files)
['saved_pickles\\data.pkl']
Filter only .pkl files¶
In [111]:
files = [f for f in os.listdir(pickle_folder) if f.endswith(".pkl")]
print(files)
['data.pkl']
In [112]:
import glob
files = glob.glob(os.path.join(pickle_folder, "*.pkl"))
print(files)
['saved_pickles\\data.pkl']
Parallel Processing¶
Basic¶
In [113]:
import os
import numpy as np
#from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
# 1) Function to create the array with eqn initialized to 0
def make_data(N=100, seed=42,
x_range=(-10, 10), y_range=(-10, 10),
m_range=(-5, 5), b_range=(-5, 5)) -> np.ndarray:
"""
Return an (N, 5) array with columns:
0:x, 1:y, 2:m, 3:b, 4:eqn
x,y,m,b are random; eqn is initialized to 0.
"""
rng = np.random.default_rng(seed)
x = rng.uniform(*x_range, N)
y = rng.uniform(*y_range, N)
m = rng.uniform(*m_range, N)
b = rng.uniform(*b_range, N)
data = np.empty((N, 5), dtype=float)
data[:, 0] = x
data[:, 1] = y
data[:, 2] = m
data[:, 3] = b
data[:, 4] = 0.0 # eqn initialized to 0
return data
# 2) Your existing block function (kept here for completeness)
def compute_eqn_block(block_xy_mb: np.ndarray) -> np.ndarray:
"""
block_xy_mb: (k, 4) array with columns [x, y, m, b]
returns: (k,) array of y + m*x + b
"""
x = block_xy_mb[:, 0]
y = block_xy_mb[:, 1]
m = block_xy_mb[:, 2]
b = block_xy_mb[:, 3]
return y + m * x + b
# 3) Parallel fill of the 5th column (eqn) using up to 4 processes
def fill_eqn_threaded(data: np.ndarray, res_col_idx: int = 4, func = compute_eqn_block, max_workers: int = 4) -> None:
N = data.shape[0]
workers = min(max_workers, os.cpu_count() or 1)
idx_chunks = np.array_split(np.arange(N), workers)
def job(block): # local helper
return func(block)
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = []
for idxs in idx_chunks:
if idxs.size == 0: continue
block = data[idxs, :res_col_idx] # no copy needed for threads, # (k,4) slice of rows, only x,y,m,b
futures.append((idxs, ex.submit(job, block)))
for idxs, fut in futures:
data[idxs, res_col_idx] = fut.result() #writes each computed value into the res_col_idx th column for the matching rows.
# Demo
data = make_data(N=1_000_000, seed=123)
print("Before:\n", data[:5])
fill_eqn_threaded(data, max_workers=4)
print("\nAfter:\n", data[:5])
Before: [[ 3.64703726 1.45309701 1.71880083 2.40181768 0. ] [-8.92357962 0.90030834 -0.69528411 1.634334 0. ] [-5.59280254 -1.23263745 -2.88959245 3.86790674 0. ] [-6.31256379 7.26584232 -4.24529389 4.71757123 0. ] [-6.48188198 9.30544053 -0.03679585 1.65487849 0. ]] After: [[ 3.64703726e+00 1.45309701e+00 1.71880083e+00 2.40181768e+00 1.01234454e+01] [-8.92357962e+00 9.00308339e-01 -6.95284110e-01 1.63433400e+00 8.73906545e+00] [-5.59280254e+00 -1.23263745e+00 -2.88959245e+00 3.86790674e+00 1.87961893e+01] [-6.31256379e+00 7.26584232e+00 -4.24529389e+00 4.71757123e+00 3.87821020e+01] [-6.48188198e+00 9.30544053e+00 -3.67958523e-02 1.65487849e+00 1.11988254e+01]]
Compare: Linear Equation¶
In [114]:
import os
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from time import perf_counter
# ---------- timing decorator ----------
def timed(label: str | None = None):
"""Decorator to print wall-clock runtime of a function."""
def deco(func):
@wraps(func)
def wrapper(*args, **kwargs):
t0 = perf_counter()
try:
return func(*args, **kwargs)
finally:
dt = perf_counter() - t0
name = label or func.__name__
print(f"[{name}] {dt:.3f} s")
return wrapper
return deco
# ---------- your helpers ----------
def make_data(N=100, seed=42,
x_range=(-10, 10), y_range=(-10, 10),
m_range=(-5, 5), b_range=(-5, 5)) -> np.ndarray:
rng = np.random.default_rng(seed)
x = rng.uniform(*x_range, N)
y = rng.uniform(*y_range, N)
m = rng.uniform(*m_range, N)
b = rng.uniform(*b_range, N)
data = np.empty((N, 5), dtype=float)
data[:, 0] = x
data[:, 1] = y
data[:, 2] = m
data[:, 3] = b
data[:, 4] = 0.0 # eqn initialized to 0
return data
def compute_eqn_block_linear(block_xy_mb: np.ndarray) -> np.ndarray:
x = block_xy_mb[:, 0]
y = block_xy_mb[:, 1]
m = block_xy_mb[:, 2]
b = block_xy_mb[:, 3]
return y + m * x + b
# ---------- threaded version (timed) ----------
@timed("threaded")
def fill_eqn_threaded(data: np.ndarray, res_col_idx: int = 4, func = compute_eqn_block_linear, max_workers: int = 4) -> None:
"""
In-place: computes eqn = y + m*x + b for each row and writes into data[:, res_col_idx].
Uses threads (good in notebooks; NumPy releases GIL during vector ops).
"""
N = data.shape[0]
workers = min(max_workers, os.cpu_count() or 1)
idx_chunks = np.array_split(np.arange(N), workers)
def job(block):
return func(block)
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = []
for idxs in idx_chunks:
if idxs.size == 0:
continue
block = data[idxs, :4] # always pass x,y,m,b
futures.append((idxs, ex.submit(job, block)))
for idxs, fut in futures:
data[idxs, res_col_idx] = fut.result()
# ---------- pure NumPy vectorized baseline (timed) ----------
@timed("vectorized")
def fill_eqn_vectorized(data: np.ndarray, res_col_idx: int = 4) -> None:
"""
In-place vectorized: eqn = y + m*x + b (typically very fast).
"""
x = data[:, 0]
y = data[:, 1]
m = data[:, 2]
b = data[:, 3]
data[:, res_col_idx] = y + m * x + b
# ---------- demo ----------
N = 1_000_000
# Threaded
data_thr = make_data(N=N, seed=123)
fill_eqn_threaded(data_thr, res_col_idx=4, max_workers=4)
# Vectorized
data_vec = make_data(N=N, seed=123)
fill_eqn_vectorized(data_vec, res_col_idx=4)
# Optional: verify both methods produce the same results
same = np.allclose(data_thr[:, 4], data_vec[:, 4])
print("Results equal:", same)
# Show a peek
np.set_printoptions(precision=3, suppress=True)
print("Sample rows:\nThreaded:\n", data_thr[:5], "\nVectorized:\n", data_vec[:5])
[threaded] 0.024 s [vectorized] 0.011 s Results equal: True Sample rows: Threaded: [[ 3.647 1.453 1.719 2.402 10.123] [-8.924 0.9 -0.695 1.634 8.739] [-5.593 -1.233 -2.89 3.868 18.796] [-6.313 7.266 -4.245 4.718 38.782] [-6.482 9.305 -0.037 1.655 11.199]] Vectorized: [[ 3.647 1.453 1.719 2.402 10.123] [-8.924 0.9 -0.695 1.634 8.739] [-5.593 -1.233 -2.89 3.868 18.796] [-6.313 7.266 -4.245 4.718 38.782] [-6.482 9.305 -0.037 1.655 11.199]]
Compare: Polynomial Equation¶
In [115]:
import os
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from time import perf_counter
# ---------- timing decorator ----------
def timed(label: str | None = None):
"""Decorator to print wall-clock runtime of a function."""
def deco(func):
@wraps(func)
def wrapper(*args, **kwargs):
t0 = perf_counter()
try:
return func(*args, **kwargs)
finally:
dt = perf_counter() - t0
name = label or func.__name__
print(f"[{name}] {dt:.3f} s")
return wrapper
return deco
# ---------- your helpers ----------
def make_data(N=100, seed=42,
x_range=(-10, 10), y_range=(-10, 10),
m_range=(-5, 5), b_range=(-5, 5)) -> np.ndarray:
rng = np.random.default_rng(seed)
x = rng.uniform(*x_range, N)
y = rng.uniform(*y_range, N)
m = rng.uniform(*m_range, N)
b = rng.uniform(*b_range, N)
data = np.empty((N, 5), dtype=float)
data[:, 0] = x
data[:, 1] = y
data[:, 2] = m
data[:, 3] = b
data[:, 4] = 0.0 # eqn initialized to 0
return data
def compute_eqn_block_poly(block_xy_mb: np.ndarray) -> np.ndarray: # For thread
x = block_xy_mb[:, 0]
y = block_xy_mb[:, 1]
m = block_xy_mb[:, 2]
b = block_xy_mb[:, 3]
return y**10 + m * x ** 4 + b**3
# ---------- threaded version (timed) ----------
@timed("threaded")
def fill_eqn_threaded(data: np.ndarray, res_col_idx: int = 4, func = compute_eqn_block_poly, max_workers: int = 4) -> None:
"""
In-place: computes eqn = y + m*x + b for each row and writes into data[:, res_col_idx].
Uses threads (good in notebooks; NumPy releases GIL during vector ops).
"""
N = data.shape[0]
workers = min(max_workers, os.cpu_count() or 1)
idx_chunks = np.array_split(np.arange(N), workers)
def job(block):
return func(block)
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = []
for idxs in idx_chunks:
if idxs.size == 0:
continue
block = data[idxs, :4] # always pass x,y,m,b
futures.append((idxs, ex.submit(job, block)))
for idxs, fut in futures:
data[idxs, res_col_idx] = fut.result()
# ---------- pure NumPy vectorized baseline (timed) ----------
@timed("vectorized")
def fill_eqn_vectorized(data: np.ndarray, res_col_idx: int = 4) -> None:
"""
In-place vectorized: eqn = y + m*x + b (typically very fast).
"""
x = data[:, 0]
y = data[:, 1]
m = data[:, 2]
b = data[:, 3]
data[:, res_col_idx] = y**10 + m * x ** 4 + b**3
# ---------- demo ----------
N = 300_000_000
# Threaded
data_thr = make_data(N=N, seed=123)
fill_eqn_threaded(data_thr, res_col_idx=4, max_workers=4)
# Vectorized
data_vec = make_data(N=N, seed=123)
fill_eqn_vectorized(data_vec, res_col_idx=4)
# Optional: verify both methods produce the same results
same = np.allclose(data_thr[:, 4], data_vec[:, 4])
print("Results equal:", same)
# Show a peek
np.set_printoptions(precision=3, suppress=True)
print("Sample rows:\nThreaded:\n", data_thr[:5], "\nVectorized:\n", data_vec[:5])
[threaded] 11.626 s [vectorized] 22.317 s Results equal: True Sample rows: Threaded: [[ 3.647e+00 -9.145e+00 2.247e+00 1.347e+00 4.093e+09] [-8.924e+00 -1.538e+00 2.155e+00 -2.811e+00 1.372e+04] [-5.593e+00 4.434e+00 -3.733e+00 -1.544e+00 2.933e+06] [-6.313e+00 -1.618e-01 3.070e+00 -3.324e+00 4.838e+03] [-6.482e+00 2.431e+00 3.817e+00 -3.901e+00 1.388e+04]] Vectorized: [[ 3.647e+00 -9.145e+00 2.247e+00 1.347e+00 4.093e+09] [-8.924e+00 -1.538e+00 2.155e+00 -2.811e+00 1.372e+04] [-5.593e+00 4.434e+00 -3.733e+00 -1.544e+00 2.933e+06] [-6.313e+00 -1.618e-01 3.070e+00 -3.324e+00 4.838e+03] [-6.482e+00 2.431e+00 3.817e+00 -3.901e+00 1.388e+04]]
Drop-in Numba version (fast, parallel, no ThreadPool)¶
Exactly 👍 — that’s the sweet spot for Numba.
When you use @njit, Numba translates your function into LLVM IR (via llvmlite), so it runs close to C speed, but it only supports a restricted Python + NumPy subset.
✅ Supported (works well in Numba)¶
- NumPy arrays: creation, slicing, broadcasting (basic).
- Math functions:
np.min,np.max,np.sum,np.mean,np.std,np.sqrt,np.exp,np.log,np.floor,np.ceil, etc. - Basic control flow:
for,while,if/else,break,continue. - Operators:
+ - * / % **, comparisons, boolean logic. - Random numbers:
np.random.rand,np.random.randn, etc. (with some caveats). - Loops:
prange(parallel loop), normalrange.
⚠️ Limited / Restricted¶
- Not all NumPy functions are supported (e.g.,
np.linalgis partial,np.fftnot supported). - No full object dtype, only numeric arrays (
int32,int64,float32,float64, bool). - No arbitrary Python objects (lists of lists, dicts, sets, classes).
- Limited string support (basically none).
- Exception handling is minimal (no
try/exceptinsidenjit).
❌ Not supported¶
- Pandas (as you asked earlier).
- High-level Python features like generators, context managers, dynamic typing.
- Some
NumPyadvanced features (structured arrays, masked arrays, etc.).
👉 So if your logic is “array math + loops + conditionals + simple NumPy functions”, Numba will JIT-compile it to very fast machine code.
Do you want me to give you a reference list of the most useful NumPy functions that Numba supports (like a cheat sheet)?
In [2]:
#conda remove -y numpy numba llvmlite
#conda install -y numpy=2.3 numba llvmlite
In [3]:
# works for CPU, not GPU reliably
# pip install numba numpy
# pip show numba
# pip show llvmlite
import os
import numpy as np
from functools import wraps
from time import perf_counter
from numba import njit, prange, set_num_threads, get_num_threads
# ---------- timing decorator ----------
def timed(label: str | None = None):
def deco(func):
@wraps(func)
def wrapper(*args, **kwargs):
t0 = perf_counter()
try:
return func(*args, **kwargs)
finally:
dt = perf_counter() - t0
print(f"[{label or func.__name__}] {dt:.3f} s")
return wrapper
return deco
# ---------- data maker ----------
def make_data(N=100, seed=42,
x_range=(-10, 10), y_range=(-10, 10),
m_range=(-5, 5), b_range=(-5, 5)) -> np.ndarray:
rng = np.random.default_rng(seed)
data = np.empty((N, 5), dtype=np.float64)
data[:, 0] = rng.uniform(*x_range, N) # x
data[:, 1] = rng.uniform(*y_range, N) # y
data[:, 2] = rng.uniform(*m_range, N) # m
data[:, 3] = rng.uniform(*b_range, N) # b
data[:, 4] = 0.0 # eqn
return data
# ---------- Numba kernel: parallel & fast ----------
@njit(parallel=True, fastmath=True) # fastmath lets LLVM use fused ops / vectorization
def fill_eqn_numba(data, res_col_idx=4):
x = data[:, 0]
y = data[:, 1]
m = data[:, 2]
b = data[:, 3]
out = data[:, res_col_idx]
n = x.shape[0]
# Use prange to split across threads
for i in prange(n):
# Sometimes manual powers are faster than ** for big exponents
x2 = x[i] * x[i]
x4 = x2 * x2
b3 = b[i] * b[i] * b[i]
# y**10 via repeated squaring (often faster/more stable than pow in tight loops)
y2 = y[i] * y[i]
y4 = y2 * y2
y8 = y4 * y4
y10 = y8 * y2
out[i] = y10 + m[i] * x4 + b3
# ---------- vectorized baseline ----------
@timed("vectorized")
def fill_eqn_vectorized(data, res_col_idx=4):
x = data[:, 0]; y = data[:, 1]; m = data[:, 2]; b = data[:, 3]
# Beware: these allocate temporaries; with huge N this can be heavy on RAM
data[:, res_col_idx] = y**10 + m * (x**4) + b**3
# ---------- numba wrapper with timing (includes first-call compile) ----------
@timed("numba_parallel")
def run_numba(data, res_col_idx=4, threads=None):
if threads:
set_num_threads(threads)
fill_eqn_numba(data, res_col_idx)
# print(f"Numba used {get_num_threads()} threads") # uncomment if you want to see it
N = 300_000_000 # start smaller than 100M unless you have lots of RAM
data_nb = make_data(N=N, seed=123)
run_numba(data_nb, threads=4) # JIT compiles on first call, then runs fast
data_vec = make_data(N=N, seed=123)
fill_eqn_vectorized(data_vec)
print("Equal?", np.allclose(data_nb[:,4], data_vec[:,4]))
print(data_nb[:5])
[numba_parallel] 1.028 s [vectorized] 23.581 s Equal? True [[ 3.64703726e+00 -9.14534317e+00 2.24687569e+00 1.34727556e+00 4.09260762e+09] [-8.92357962e+00 -1.53822569e+00 2.15513093e+00 -2.81057371e+00 1.37176067e+04] [-5.59280254e+00 4.43382736e+00 -3.73319487e+00 -1.54355059e+00 2.93255834e+06] [-6.31256379e+00 -1.61827822e-01 3.07012645e+00 -3.32402093e+00 4.83832595e+03] [-6.48188198e+00 2.43070411e+00 3.81653303e+00 -3.90052049e+00 1.38775736e+04]]
CUDA cores → GPU threads (Numba @cuda.jit)¶
In [4]:
# conda create -n numba-cuda -c conda-forge python=3.12 numba numpy cudatoolkit
# conda activate numba-cuda
# conda install -c conda-forge jupyterlab
# pip uninstall -y numba llvmlite
# conda install -c conda-forge numba cudatoolkit
import os
import numpy as np
from functools import wraps
from time import perf_counter
from numba import njit, prange, set_num_threads, get_num_threads
from numba import cuda
# ---------- timing decorator ----------
def timed(label: str | None = None):
def deco(func):
@wraps(func)
def wrapper(*args, **kwargs):
t0 = perf_counter()
try:
return func(*args, **kwargs)
finally:
dt = perf_counter() - t0
print(f"[{label or func.__name__}] {dt:.3f} s")
return wrapper
return deco
# ---------- data maker ----------
def make_data(N=100, seed=42,
x_range=(-10, 10), y_range=(-10, 10),
m_range=(-5, 5), b_range=(-5, 5),
dtype=np.float64) -> np.ndarray:
rng = np.random.default_rng(seed)
data = np.empty((N, 5), dtype=dtype)
data[:, 0] = rng.uniform(*x_range, N) # x
data[:, 1] = rng.uniform(*y_range, N) # y
data[:, 2] = rng.uniform(*m_range, N) # m
data[:, 3] = rng.uniform(*b_range, N) # b
data[:, 4] = 0.0 # eqn
return data
# ---------- Numba CPU kernel ----------
@njit(parallel=True, fastmath=True)
def fill_eqn_numba(data, res_col_idx=4):
x = data[:, 0]
y = data[:, 1]
m = data[:, 2]
b = data[:, 3]
out = data[:, res_col_idx]
n = x.shape[0]
for i in prange(n):
x2 = x[i] * x[i]; x4 = x2 * x2
b3 = b[i] * b[i] * b[i]
y2 = y[i] * y[i]; y4 = y2 * y2; y8 = y4 * y4; y10 = y8 * y2
out[i] = y10 + m[i] * x4 + b3
@timed("vectorized")
def fill_eqn_vectorized(data, res_col_idx=4):
x = data[:, 0]; y = data[:, 1]; m = data[:, 2]; b = data[:, 3]
data[:, res_col_idx] = y**10 + m * (x**4) + b**3
@timed("numba_parallel")
def run_numba(data, res_col_idx=4, threads=None):
if threads:
set_num_threads(threads)
fill_eqn_numba(data, res_col_idx)
# ---------- Numba CUDA kernel ----------
# Works with float32 or float64; float32 is faster on most GPUs.
@cuda.jit(fastmath=True)
def fill_eqn_cuda(x, y, m, b, out):
i = cuda.grid(1)
if i < x.size:
xi = x[i]; yi = y[i]; mi = m[i]; bi = b[i]
x2 = xi * xi; x4 = x2 * x2
b3 = bi * bi * bi
y2 = yi * yi; y4 = y2 * y2; y8 = y4 * y4; y10 = y8 * y2
out[i] = y10 + mi * x4 + b3
@timed("cuda")
def run_cuda(data, res_col_idx=4, threads_per_block=256, use_float32=False):
"""
Compute eqn on GPU and write back into data[:, res_col_idx].
Set use_float32=True for speed (but results differ slightly from float64).
"""
N = data.shape[0]
# pick dtype
dtype = np.float32 if use_float32 else np.float64
# slice host arrays and make them contiguous for the GPU
x_h = np.ascontiguousarray(data[:, 0], dtype=dtype)
y_h = np.ascontiguousarray(data[:, 1], dtype=dtype)
m_h = np.ascontiguousarray(data[:, 2], dtype=dtype)
b_h = np.ascontiguousarray(data[:, 3], dtype=dtype)
# allocate device arrays and copy
d_x = cuda.to_device(x_h)
d_y = cuda.to_device(y_h)
d_m = cuda.to_device(m_h)
d_b = cuda.to_device(b_h)
d_out = cuda.device_array(N, dtype=dtype)
blocks = (N + threads_per_block - 1) // threads_per_block
fill_eqn_cuda[blocks, threads_per_block](d_x, d_y, d_m, d_b, d_out)
# copy back to host; if original data is float64 and we used float32,
# upcast on assignment (with small numeric differences).
data[:, res_col_idx] = d_out.copy_to_host().astype(data.dtype, copy=False)
@timed("cuda_chunked")
def run_cuda_chunked(data, res_col_idx=4, threads_per_block=256, chunk_size=5_000_000, use_float32=False):
dtype = np.float32 if use_float32 else np.float64
N = data.shape[0]
for start in range(0, N, chunk_size):
end = min(start + chunk_size, N)
# slice host arrays and make them contiguous for the GPU
x_h = np.ascontiguousarray(data[start:end, 0], dtype=dtype)
y_h = np.ascontiguousarray(data[start:end, 1], dtype=dtype)
m_h = np.ascontiguousarray(data[start:end, 2], dtype=dtype)
b_h = np.ascontiguousarray(data[start:end, 3], dtype=dtype)
d_x = cuda.to_device(x_h)
d_y = cuda.to_device(y_h)
d_m = cuda.to_device(m_h)
d_b = cuda.to_device(b_h)
d_out = cuda.device_array(end - start, dtype=dtype)
blocks = ((end - start) + threads_per_block - 1) // threads_per_block
fill_eqn_cuda[blocks, threads_per_block](d_x, d_y, d_m, d_b, d_out)
data[start:end, res_col_idx] = d_out.copy_to_host().astype(data.dtype, copy=False)
# ---------- demo ----------
if __name__ == "__main__":
# Use a size that fits your GPU RAM; float64 needs 8*N bytes per column.
N = 30_000_000 # try this first; scale up if your GPU has more memory
# CPU Numba
data_nb = make_data(N=N, seed=123, dtype=np.float64)
run_numba(data_nb, threads=os.cpu_count())
# Vectorized
data_vec = make_data(N=N, seed=123, dtype=np.float64)
fill_eqn_vectorized(data_vec)
print("Equal (CPU paths)?", np.allclose(data_nb[:, 4], data_vec[:, 4]))
# CUDA (float64 for apples-to-apples equality; use_float32=True for speed)
data_gpu = make_data(N=N, seed=123, dtype=np.float64)
run_cuda(data_gpu, res_col_idx=4, threads_per_block=256, use_float32=False)
print("Equal (GPU vs vectorized, float64)?", np.allclose(data_gpu[:, 4], data_vec[:, 4]))
#print("Sample rows:\n", data_gpu[:5])
# Fits big N by processing in batches on the GPU
data_gpu_chunk = make_data(N=N, seed=123, dtype=np.float64)
run_cuda_chunked(data_gpu_chunk, res_col_idx=4, threads_per_block=256,
chunk_size=5_000_000, # tune this (see below)
use_float32=False) # True = faster & less memory, slight numeric diffs
print("Equal (GPU_Chunked vs vectorized, float64)?", np.allclose(data_gpu_chunk[:, 4], data_vec[:, 4]))
[numba_parallel] 0.411 s [vectorized] 2.434 s Equal (CPU paths)? True [cuda] 1.129 s Equal (GPU vs vectorized, float64)? True [cuda_chunked] 0.682 s Equal (GPU_Chunked vs vectorized, float64)? True
Chunked¶
In [5]:
import os
import numpy as np
from functools import wraps
from time import perf_counter
from numba import njit, prange, set_num_threads, get_num_threads
from numba import cuda
# ---------- timing decorator ----------
def timed(label: str | None = None):
def deco(func):
@wraps(func)
def wrapper(*args, **kwargs):
t0 = perf_counter()
try:
return func(*args, **kwargs)
finally:
dt = perf_counter() - t0
print(f"[{label or func.__name__}] {dt:.3f} s")
return wrapper
return deco
# ---------- data maker ----------
def make_data(N=100, seed=42,
x_range=(-10, 10), y_range=(-10, 10),
m_range=(-5, 5), b_range=(-5, 5),
dtype=np.float64) -> np.ndarray:
rng = np.random.default_rng(seed)
data = np.empty((N, 5), dtype=dtype)
data[:, 0] = rng.uniform(*x_range, N) # x
data[:, 1] = rng.uniform(*y_range, N) # y
data[:, 2] = rng.uniform(*m_range, N) # m
data[:, 3] = rng.uniform(*b_range, N) # b
data[:, 4] = 0.0 # eqn
return data
# ---------- Numba CPU kernel ----------
@njit(parallel=True, fastmath=True)
def fill_eqn_numba(data, res_col_idx=4):
x = data[:, 0]
y = data[:, 1]
m = data[:, 2]
b = data[:, 3]
out = data[:, res_col_idx]
n = x.shape[0]
for i in prange(n):
x2 = x[i] * x[i]; x4 = x2 * x2
b3 = b[i] * b[i] * b[i]
y2 = y[i] * y[i]; y4 = y2 * y2; y8 = y4 * y4; y10 = y8 * y2
out[i] = y10 + m[i] * x4 + b3
@timed("vectorized")
def fill_eqn_vectorized(data, res_col_idx=4):
x = data[:, 0]; y = data[:, 1]; m = data[:, 2]; b = data[:, 3]
data[:, res_col_idx] = y**10 + m * (x**4) + b**3
# ---------- chunked Numba (CPU) ----------
@timed("numba_chunked")
def run_numba_chunked(data, res_col_idx=4, chunk_size=5_000_000, threads=None):
"""
Calls the same @njit(parallel=True) kernel on row-slices of `data`.
Good when temporaries/cache become the bottleneck on huge arrays.
"""
if threads:
set_num_threads(threads)
N = data.shape[0]
for start in range(0, N, chunk_size):
end = min(start + chunk_size, N)
# Row slices are C-contiguous → zero-copy views
fill_eqn_numba(data[start:end, :], res_col_idx)
# ---------- chunked vectorized ----------
@timed("vectorized_chunked")
def fill_eqn_vectorized_chunked(data, res_col_idx=4, chunk_size=5_000_000):
"""
Vectorized per-chunk to limit temporary array sizes.
"""
N = data.shape[0]
for start in range(0, N, chunk_size):
end = min(start + chunk_size, N)
x = data[start:end, 0]
y = data[start:end, 1]
m = data[start:end, 2]
b = data[start:end, 3]
data[start:end, res_col_idx] = y**10 + m*(x**4) + b**3
# Numba (CPU) in chunks
data_nb = make_data(N=300_000_000, seed=123, dtype=np.float64)
#run_numba_chunked(data_nb, res_col_idx=4, chunk_size=5_000_000, threads=os.cpu_count())
run_numba_chunked(data_nb, res_col_idx=4, chunk_size=5_000_000, threads=4)
# Vectorized in chunks
data_vec = make_data(N=300_000_000, seed=123, dtype=np.float64)
fill_eqn_vectorized_chunked(data_vec, res_col_idx=4, chunk_size=5_000_000)
print("Equal?", np.allclose(data_nb[:,4], data_vec[:,4]))
[numba_chunked] 0.999 s [vectorized_chunked] 23.083 s Equal? True
numba module basics for future use¶
In [6]:
# numba module basics for future use ________________________________________
import os
import numpy as np
from functools import wraps
from time import perf_counter
from concurrent.futures import ThreadPoolExecutor
from numba import njit, prange, set_num_threads, get_num_threads
from numba import cuda
#////////////////////////////////////////////////////////////////////////////////////////////////////
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
# ---------- timing decorator ----------
def timed(label: str | None = None):
def deco(func):
@wraps(func)
def wrapper(*args, **kwargs):
t0 = perf_counter()
try:
return func(*args, **kwargs)
finally:
dt = perf_counter() - t0
print(f"[{label or func.__name__}] {dt:.3f} s")
return wrapper
return deco
#////////////////////////////////////////////////////////////////////////////////////////////////////
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
# ---------- Sample data maker for testing code ----------
# --- update the equations, as your requirements *************************
def make_data(N=100, seed=42,
x_range=(-10, 10), y_range=(-10, 10),
m_range=(-5, 5), b_range=(-5, 5),
dtype=np.float64) -> np.ndarray:
rng = np.random.default_rng(seed)
data = np.empty((N, 5), dtype=dtype)
data[:, 0] = rng.uniform(*x_range, N) # x
data[:, 1] = rng.uniform(*y_range, N) # y
data[:, 2] = rng.uniform(*m_range, N) # m
data[:, 3] = rng.uniform(*b_range, N) # b
data[:, 4] = 0.0 # eqn
return data
#////////////////////////////////////////////////////////////////////////////////////////////////////
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
# ---------- Numba CPU kernel equation----------
# --- update the equations *************************
@njit(parallel=True, fastmath=True)
def fill_eqn_numba(data, res_col_idx=4):
x = data[:, 0]
y = data[:, 1]
m = data[:, 2]
b = data[:, 3]
out = data[:, res_col_idx]
n = x.shape[0]
for i in prange(n):
x2 = x[i] * x[i]; x4 = x2 * x2
b3 = b[i] * b[i] * b[i]
y2 = y[i] * y[i]; y4 = y2 * y2; y8 = y4 * y4; y10 = y8 * y2
out[i] = y10 + m[i] * x4 + b3
# ---------- chunked Numba (CPU) ----------
@timed("numba_chunked")
def run_numba_chunked(data, res_col_idx=4, chunk_size=5_000_000, threads=None, func = fill_eqn_numba):
"""
Calls the same @njit(parallel=True) kernel on row-slices of `data`.
Good when temporaries/cache become the bottleneck on huge arrays.
"""
if threads:
set_num_threads(threads)
N = data.shape[0]
for start in range(0, N, chunk_size):
end = min(start + chunk_size, N)
# Row slices are C-contiguous → zero-copy views
func(data[start:end, :], res_col_idx)
#////////////////////////////////////////////////////////////////////////////////////////////////////
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
# ---------- Numba CUDA kernel ----------
# --- update the equations *************************
# Works with float32 or float64; float32 is faster on most GPUs.
@cuda.jit(fastmath=True)
def fill_eqn_cuda(x, y, m, b, out):
i = cuda.grid(1)
if i < x.size:
xi = x[i]; yi = y[i]; mi = m[i]; bi = b[i]
x2 = xi * xi; x4 = x2 * x2
b3 = bi * bi * bi
y2 = yi * yi; y4 = y2 * y2; y8 = y4 * y4; y10 = y8 * y2
out[i] = y10 + mi * x4 + b3
# ---------- chunked Numba (GPU) ----------
@timed("cuda_chunked")
def run_cuda_chunked(data, res_col_idx=4, threads_per_block=256, chunk_size=5_000_000, use_float32=False,
func=fill_eqn_cuda):
dtype = np.float32 if use_float32 else np.float64
N = data.shape[0]
for start in range(0, N, chunk_size):
end = min(start + chunk_size, N)
# slice host arrays and make them contiguous for the GPU
x_h = np.ascontiguousarray(data[start:end, 0], dtype=dtype)
y_h = np.ascontiguousarray(data[start:end, 1], dtype=dtype)
m_h = np.ascontiguousarray(data[start:end, 2], dtype=dtype)
b_h = np.ascontiguousarray(data[start:end, 3], dtype=dtype)
d_x = cuda.to_device(x_h)
d_y = cuda.to_device(y_h)
d_m = cuda.to_device(m_h)
d_b = cuda.to_device(b_h)
d_out = cuda.device_array(end - start, dtype=dtype)
blocks = ((end - start) + threads_per_block - 1) // threads_per_block
func[blocks, threads_per_block](d_x, d_y, d_m, d_b, d_out)
data[start:end, res_col_idx] = d_out.copy_to_host().astype(data.dtype, copy=False)
#////////////////////////////////////////////////////////////////////////////////////////////////////
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
# ---------- chunked python_ThreadPoolExecutor (GPU) ----------
# Your existing block function (kept here for completeness)
# --- update the equations *************************
def compute_eqn_block(block_xy_mb: np.ndarray) -> np.ndarray:
"""
block_xy_mb: (k, 4) array with columns [x, y, m, b]
returns: (k,) array of y + m*x + b
"""
x = block_xy_mb[:, 0]
y = block_xy_mb[:, 1]
m = block_xy_mb[:, 2]
b = block_xy_mb[:, 3]
return y + m * x + b
@timed("python_ThreadPoolExecutor_chunked")
def fill_eqn_threaded_chunked(data: np.ndarray,
res_col_idx: int = 4,
func=compute_eqn_block,
max_workers: int = 4,
chunk_size: int = 5_000_000) -> None:
"""
Processes data in chunks to reduce memory pressure, each chunk is threaded internally.
"""
N = data.shape[0]
workers = min(max_workers, os.cpu_count() or 1)
for start in range(0, N, chunk_size):
end = min(start + chunk_size, N)
chunk = data[start:end, :] # slice view, no copy
# Split this chunk among workers
idx_chunks = np.array_split(np.arange(chunk.shape[0]), workers)
def job(idxs):
block = chunk[idxs, :res_col_idx] # only x,y,m,b
return func(block)
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = [(idxs, ex.submit(job, idxs)) for idxs in idx_chunks if idxs.size > 0]
for idxs, fut in futures:
chunk[idxs, res_col_idx] = fut.result()
# Writes directly into `data[start:end, res_col_idx]` since `chunk` is a view
#////////////////////////////////////////////////////////////////////////////////////////////////////
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
# Get CPU and GPU info __________________________
def print_cpu_info():
import os
import platform
try:
import psutil
except ImportError:
psutil = None
print("CPU Name:", platform.processor() or platform.machine())
print("Architecture:", platform.architecture()[0])
print("Logical CPUs:", os.cpu_count())
if psutil:
print("Physical cores:", psutil.cpu_count(logical=False))
print("Max frequency (MHz):", psutil.cpu_freq().max if psutil.cpu_freq() else "N/A")
print("Current frequency (MHz):", psutil.cpu_freq().current if psutil.cpu_freq() else "N/A")
print("Total RAM (GB):", round(psutil.virtual_memory().total / (1024**3), 2))
print(f'get_num_threads_cpu: {get_num_threads()}')
print('_'*25)
else:
print("Install psutil for more details (pip install psutil)")
print('_'*25)
def print_cuda_info():
dev = cuda.get_current_device()
print("Name:", dev.name)
print("SMs:", dev.MULTIPROCESSOR_COUNT)
print("Warp size:", dev.WARP_SIZE)
print("Max threads/block:", dev.MAX_THREADS_PER_BLOCK)
print('_'*25)
def pick_chunk_size(use_float32=False, safety=0.8):
free, _ = cuda.current_context().get_memory_info()
dtype_size = 4 if use_float32 else 8
per_elem = 5 * dtype_size
chunk_size = max(1, int(safety * free // per_elem))
print(f'chunk_size : {chunk_size}')
print('_'*25)
return chunk_size
#////////////////////////////////////////////////////////////////////////////////////////////////////
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
# ---------- chunked vectorized ----------
# --- update the equations *************************
@timed("vectorized_chunked")
def fill_eqn_vectorized_chunked(data, res_col_idx=4, chunk_size=5_000_000):
"""
Vectorized per-chunk to limit temporary array sizes.
"""
N = data.shape[0]
for start in range(0, N, chunk_size):
end = min(start + chunk_size, N)
x = data[start:end, 0]
y = data[start:end, 1]
m = data[start:end, 2]
b = data[start:end, 3]
data[start:end, res_col_idx] = y**10 + m*(x**4) + b**3
#////////////////////////////////////////////////////////////////////////////////////////////////////
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
# --- Execution *************************
data_nb = make_data(N=300_000_000, seed=123, dtype=np.float64)
data_nb_cuda = data_nb.copy()
data_nb_cuda_opt = data_nb.copy()
data_thread = data_nb.copy()
data_vec = data_nb.copy()
# Numba (CPU) in chunks ___________________________________________________________________
print_cpu_info()
#run_numba_chunked(data_nb, res_col_idx=4, chunk_size=5_000_000, threads=os.cpu_count())
run_numba_chunked(data_nb, res_col_idx=4, chunk_size=5_000_000, threads=4)
#///////////////////////////////////////////////////////////////////////////////////////////
# Fits big N by processing in batches on the GPU _________________________________________________
run_cuda_chunked(data_nb_cuda, res_col_idx=4, threads_per_block=256,
chunk_size=5_000_000, # tune this (see below)
use_float32=False) # True = faster & less memory, slight numeric diffs
#______________CUDA Optimized____________________
print_cuda_info()
chunk_size = pick_chunk_size()
run_cuda_chunked(data_nb_cuda_opt, res_col_idx=4, threads_per_block=512,
chunk_size=chunk_size, # tuned this
use_float32=False) # True = faster & less memory, slight numeric diffs
#///////////////////////////////////////////////////////////////////////////////////////////
# python_ThreadPoolExecutor in chunks ___________________________________________________________________
fill_eqn_threaded_chunked(data_thread, max_workers=4, chunk_size=5_000_000)
#///////////////////////////////////////////////////////////////////////////////////////////
# Vectorized in chunks ___________________________________________________________________
fill_eqn_vectorized_chunked(data_vec, res_col_idx=4, chunk_size=5_000_000)
#///////////////////////////////////////////////////////////////////////////////////////////
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
#___________________________________________________________________________________________________________
print("Equal (Numba vs vectorized, float64)?", np.allclose(data_nb[:, 4], data_vec[:, 4]))
print("Equal (Numba_GPU vs vectorized, float64)?", np.allclose(data_nb_cuda[:, 4], data_vec[:, 4]))
print("Equal (Numba_GPU_Opt vs vectorized, float64)?", np.allclose(data_nb_cuda_opt[:, 4], data_vec[:, 4]))
print("Equal (python_ThreadPoolExecutor vs vectorized, float64)?", np.allclose(data_thread[:, 4], data_vec[:, 4]))
CPU Name: Intel64 Family 6 Model 141 Stepping 1, GenuineIntel Architecture: 64bit Logical CPUs: 16 Physical cores: 8 Max frequency (MHz): 2611.0 Current frequency (MHz): 2611.0 Total RAM (GB): 63.2 get_num_threads_cpu: 4 _________________________ [numba_chunked] 3.917 s [cuda_chunked] 46.583 s Name: b'NVIDIA RTX A4000 Laptop GPU' SMs: 40 Warp size: 32 Max threads/block: 1024 _________________________ chunk_size : 144997089 _________________________ [cuda_chunked] 38.953 s [python_ThreadPoolExecutor_chunked] 15.292 s [vectorized_chunked] 85.214 s Equal (Numba vs vectorized, float64)? True Equal (Numba_GPU vs vectorized, float64)? True Equal (Numba_GPU_Opt vs vectorized, float64)? True Equal (python_ThreadPoolExecutor vs vectorized, float64)? False
Numba for Masking¶
In [3]:
# conda install -c conda-forge matplotlib
In [4]:
# numba_rect_mask_demo.py
# pip install numba numpy matplotlib pillow
# ------------------------------------------------------------
# Synthetic images + labels/bboxes + Numba rectangle masks,
# with side-by-side visualization using matplotlib.
#
# Quick start (CPU only):
# pip install numba numpy matplotlib pillow
# python numba_rect_mask_demo.py
#
# Notes:
# - Numba JIT-compiles the rectangles→mask routine on first call; that's why
# the first run is slower than subsequent runs.
# - Everything here is CPU. If you later want GPU, prefer vectorized PyTorch
# or a dedicated CUDA kernel. Numba can also do CUDA, but that's a different
# code path.
# ------------------------------------------------------------
from dataclasses import dataclass
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw, ImageFont
# ---------- Optional acceleration with Numba ----------
# We try to import Numba. If it's missing, the code falls back to a pure-NumPy
# version of the same kernel so the script still runs end-to-end.
try:
from numba import njit, prange
USE_NUMBA = True
except Exception as e:
USE_NUMBA = False
print("Numba not available, falling back to pure NumPy. Error:", e)
# ---------- Rectangles → instance masks (per image) ----------
# This function turns N axis-aligned boxes into N binary masks of size HxW.
# Each mask[i] is 1 inside the i-th box, 0 elsewhere.
if USE_NUMBA:
@njit(parallel=True, fastmath=False)
def rect_masks_numba(boxes, H, W):
"""
Args:
boxes : float32/float64 array of shape [N, 4] with (x1, y1, x2, y2) in pixel coords
H, W : output mask height and width
Returns:
masks : uint8 array of shape [N, H, W] with values {0,1}
"""
N = boxes.shape[0]
# Allocate all masks in one go. dtype=uint8 keeps memory small; we can
# cast to bool later for frameworks that prefer it.
out = np.zeros((N, H, W), dtype=np.uint8)
# prange parallelizes the outer loop across boxes; good when there are many boxes.
for i in prange(N):
# Read and integerize coordinates (safe for pixel indexing)
x1 = int(boxes[i, 0]); y1 = int(boxes[i, 1])
x2 = int(boxes[i, 2]); y2 = int(boxes[i, 3])
# Ensure (x1,y1) is top-left and (x2,y2) is bottom-right
if x2 < x1: x1, x2 = x2, x1
if y2 < y1: y1, y2 = y2, y1
# Clamp to image bounds to avoid index errors
if x1 < 0: x1 = 0
if y1 < 0: y1 = 0
if x2 > W: x2 = W
if y2 > H: y2 = H
# Only write if the box has positive area
if x2 > x1 and y2 > y1:
out[i, y1:y2, x1:x2] = 1
return out
else:
# Drop-in replacement when Numba isn't available.
def rect_masks_numba(boxes, H, W):
N = boxes.shape[0]
out = np.zeros((N, H, W), dtype=np.uint8)
for i in range(N):
x1 = int(boxes[i, 0]); y1 = int(boxes[i, 1])
x2 = int(boxes[i, 2]); y2 = int(boxes[i, 3])
if x2 < x1: x1, x2 = x2, x1
if y2 < y1: y1, y2 = y2, y1
x1 = max(0, min(W, x1)); x2 = max(0, min(W, x2))
y1 = max(0, min(H, y1)); y2 = max(0, min(H, y2))
if x2 > x1 and y2 > y1:
out[i, y1:y2, x1:x2] = 1
return out
# ---------- Simple container for one synthetic sample ----------
@dataclass
class Sample:
image: np.ndarray # shape [H, W, 3], dtype=uint8 (RGB)
boxes: np.ndarray # shape [N, 4], dtype=float32, (x1,y1,x2,y2)
labels: np.ndarray # shape [N], dtype=int64
masks: np.ndarray = None # shape [N, H, W], dtype=uint8 (filled later)
# ---------- Random box generator ----------
def random_boxes(H, W, n_boxes, rng):
"""
Produces n_boxes axis-aligned rectangles inside the image.
We also enforce a minimum width/height so boxes are visible.
"""
boxes = np.zeros((n_boxes, 4), dtype=np.float32)
for i in range(n_boxes):
# random top-left
x1 = rng.integers(0, W - 1)
y1 = rng.integers(0, H - 1)
# random width/height within a reasonable range
bw = int(rng.integers(max(4, W // 16), max(8, W // 3)))
bh = int(rng.integers(max(4, H // 16), max(8, H // 3)))
# bottom-right, clamped to image
x2 = min(W, x1 + bw)
y2 = min(H, y1 + bh)
# boxes are (x1, y1, x2, y2)
boxes[i] = (min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
return boxes
# ---------- Create B synthetic samples ----------
def make_synthetic_samples(B=3, H=256, W=256, seed=7):
"""
Returns a list of Samples with:
- a smooth gradient + noise background image
- 2..5 random boxes per image
- labels in {1,2,3,4}
"""
rng = np.random.default_rng(seed)
samples = []
for _ in range(B):
# Make a pseudo-natural background: gradient + Gaussian noise
gy = np.linspace(0, 1, H, dtype=np.float32)[:, None]
gx = np.linspace(0, 1, W, dtype=np.float32)[None, :]
base = 0.6 * gy + 0.4 * gx + 0.05 * rng.standard_normal((H, W)).astype(np.float32)
base = np.clip(base, 0, 1)
# Stack slightly shifted channels so it looks more colorful
img = (255 * np.stack([
base,
np.roll(base, 12, axis=1),
np.roll(base, 24, axis=0)
], axis=-1)).astype(np.uint8)
# Random boxes and integer labels
n_boxes = int(rng.integers(2, 6)) # 2..5 per image
boxes = random_boxes(H, W, n_boxes, rng)
labels = rng.integers(1, 5, size=(n_boxes,), dtype=np.int64)
samples.append(Sample(image=img, boxes=boxes, labels=labels))
return samples
# ---------- Visualization helpers ----------
def draw_boxes_on_image(img_rgb, boxes, labels):
"""
Returns a copy of img_rgb with red rectangles and small label badges.
PIL is used for convenience; cv2 would work as well.
"""
im = Image.fromarray(img_rgb.copy())
dr = ImageDraw.Draw(im)
try:
font = ImageFont.load_default()
except Exception:
font = None
for i, box in enumerate(boxes):
x1, y1, x2, y2 = [int(v) for v in box.tolist()]
# Draw rectangle
dr.rectangle([x1, y1, x2, y2], outline=(255, 0, 0), width=2)
# A tiny label "badge" near the top-left corner
txt = str(int(labels[i]))
try:
tw = int(dr.textlength(txt, font=font)) # width in pixels
except Exception:
tw = 8 * len(txt)
dr.rectangle([x1, max(0, y1 - 12), x1 + tw + 6, y1], fill=(255, 0, 0))
dr.text((x1 + 3, max(0, y1 - 12)), txt, fill=(255, 255, 255), font=font)
return np.array(im)
def colorize_instance_masks(masks, H, W):
"""
Assign a distinct random color to each instance mask and composite into
a single HxWx3 overlay. Non-overlapping rectangles will show distinct
colors; overlapping regions take the last written color.
"""
N = masks.shape[0]
if N == 0:
return np.zeros((H, W, 3), dtype=np.uint8)
# Deterministic color palette for repeatability
rng = np.random.default_rng(1234)
colors = rng.integers(64, 255, size=(N, 3), dtype=np.uint8)
overlay = np.zeros((H, W, 3), dtype=np.uint8)
for i in range(N):
m = masks[i].astype(bool) # to boolean for indexing
overlay[m] = colors[i] # broadcast color into masked pixels
return overlay
def blend(a, b, alpha=0.5):
"""
Alpha-blend two RGB images (uint8). 'a' is usually the original,
'b' the color overlay. alpha in [0,1]: 0 -> only b, 1 -> only a.
"""
return (alpha * a.astype(np.float32) + (1 - alpha) * b.astype(np.float32)).clip(0, 255).astype(np.uint8)
def side_by_side(left, right):
"""Horizontally concatenate two same-height images."""
assert left.shape[0] == right.shape[0]
return np.concatenate([left, right], axis=1)
# ---------- Main: generate, mask, and plot ----------
if __name__ == "__main__":
# Image size and how many samples to create
H, W, B = 256, 256, 3
# 1) Make synthetic samples (images + boxes + labels)
samples = make_synthetic_samples(B=B, H=H, W=W, seed=77)
# 2) Build per-box masks with Numba (first call compiles if Numba is enabled)
for s in samples:
# rect_masks_numba returns [N, H, W] uint8 masks (0/1)
s.masks = rect_masks_numba(s.boxes.astype(np.float32), H, W)
# 3) Visualize each sample:
# Left: image with red rectangles
# Right: same image with colored mask overlay
for i, s in enumerate(samples):
left = draw_boxes_on_image(s.image, s.boxes, s.labels)
overlay = colorize_instance_masks(s.masks, H, W)
right = blend(s.image, overlay, alpha=0.5)
both = side_by_side(left, right)
# One figure per sample (keeps UI simple and readable)
plt.figure(figsize=(10, 5))
plt.axis("off")
plt.title(f"Sample {i} — left: boxes, right: mask overlay")
plt.imshow(both)
# Render all figures at the end
plt.show()
In [5]:
# numba_rect_mask_demo.py
# pip install numba numpy matplotlib pillow
# ------------------------------------------------------------
# Synthetic images + labels/bboxes + Numba rectangle masks,
# with side-by-side visualization using matplotlib.
#
# Quick start (CPU only):
# pip install numba numpy matplotlib pillow
# python numba_rect_mask_demo.py
#
# Notes:
# - Numba JIT-compiles the rectangles→mask routine on first call; that's why
# the first run is slower than subsequent runs.
# - Everything here is CPU. If you later want GPU, prefer vectorized PyTorch
# or a dedicated CUDA kernel. Numba can also do CUDA, but that's a different
# code path.
# ------------------------------------------------------------
from dataclasses import dataclass
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw, ImageFont
# ---------- Optional acceleration with Numba ----------
# We try to import Numba. If it's missing, the code falls back to a pure-NumPy
# version of the same kernel so the script still runs end-to-end.
try:
from numba import njit, prange
USE_NUMBA = True
except Exception as e:
USE_NUMBA = False
print("Numba not available, falling back to pure NumPy. Error:", e)
# ---------- Rectangles → instance masks (per image) ----------
# This function turns N axis-aligned boxes into N binary masks of size HxW.
# Each mask[i] is 1 inside the i-th box, 0 elsewhere.
if USE_NUMBA:
@njit(parallel=True, fastmath=False)
def rect_masks_numba(boxes, H, W):
"""
Args:
boxes : float32/float64 array of shape [N, 4] with (x1, y1, x2, y2) in pixel coords
H, W : output mask height and width
Returns:
masks : uint8 array of shape [N, H, W] with values {0,1}
"""
N = boxes.shape[0]
# Allocate all masks in one go. dtype=uint8 keeps memory small; we can
# cast to bool later for frameworks that prefer it.
out = np.zeros((N, H, W), dtype=np.uint8)
# prange parallelizes the outer loop across boxes; good when there are many boxes.
for i in prange(N):
# Read and integerize coordinates (safe for pixel indexing)
x1 = int(boxes[i, 0]); y1 = int(boxes[i, 1])
x2 = int(boxes[i, 2]); y2 = int(boxes[i, 3])
# Ensure (x1,y1) is top-left and (x2,y2) is bottom-right
if x2 < x1: x1, x2 = x2, x1
if y2 < y1: y1, y2 = y2, y1
# Clamp to image bounds to avoid index errors
if x1 < 0: x1 = 0
if y1 < 0: y1 = 0
if x2 > W: x2 = W
if y2 > H: y2 = H
# Only write if the box has positive area
if x2 > x1 and y2 > y1:
out[i, y1:y2, x1:x2] = 1
return out
else:
# Drop-in replacement when Numba isn't available.
def rect_masks_numba(boxes, H, W):
N = boxes.shape[0]
out = np.zeros((N, H, W), dtype=np.uint8)
for i in range(N):
x1 = int(boxes[i, 0]); y1 = int(boxes[i, 1])
x2 = int(boxes[i, 2]); y2 = int(boxes[i, 3])
if x2 < x1: x1, x2 = x2, x1
if y2 < y1: y1, y2 = y2, y1
x1 = max(0, min(W, x1)); x2 = max(0, min(W, x2))
y1 = max(0, min(H, y1)); y2 = max(0, min(H, y2))
if x2 > x1 and y2 > y1:
out[i, y1:y2, x1:x2] = 1
return out
# ---------- Simple container for one synthetic sample ----------
@dataclass
class Sample:
image: np.ndarray # shape [H, W, 3], dtype=uint8 (RGB)
boxes: np.ndarray # shape [N, 4], dtype=float32, (x1,y1,x2,y2)
labels: np.ndarray # shape [N], dtype=int64
masks: np.ndarray = None # shape [N, H, W], dtype=uint8 (filled later)
# ---------- Random box generator ----------
def random_boxes(H, W, n_boxes, rng):
"""
Produces n_boxes axis-aligned rectangles inside the image.
We also enforce a minimum width/height so boxes are visible.
"""
boxes = np.zeros((n_boxes, 4), dtype=np.float32)
for i in range(n_boxes):
# random top-left
x1 = rng.integers(0, W - 1)
y1 = rng.integers(0, H - 1)
# random width/height within a reasonable range
bw = int(rng.integers(max(4, W // 16), max(8, W // 3)))
bh = int(rng.integers(max(4, H // 16), max(8, H // 3)))
# bottom-right, clamped to image
x2 = min(W, x1 + bw)
y2 = min(H, y1 + bh)
# boxes are (x1, y1, x2, y2)
boxes[i] = (min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
return boxes
# ---------- Create B synthetic samples ----------
def make_synthetic_samples(B=3, H=256, W=256, seed=7):
"""
Returns a list of Samples with:
- a smooth gradient + noise background image
- 2..5 random boxes per image
- labels in {1,2,3,4}
"""
rng = np.random.default_rng(seed)
samples = []
for _ in range(B):
# Make a pseudo-natural background: gradient + Gaussian noise
gy = np.linspace(0, 1, H, dtype=np.float32)[:, None]
gx = np.linspace(0, 1, W, dtype=np.float32)[None, :]
base = 0.6 * gy + 0.4 * gx + 0.05 * rng.standard_normal((H, W)).astype(np.float32)
base = np.clip(base, 0, 1)
# Stack slightly shifted channels so it looks more colorful
img = (255 * np.stack([
base,
np.roll(base, 12, axis=1),
np.roll(base, 24, axis=0)
], axis=-1)).astype(np.uint8)
# Random boxes and integer labels
n_boxes = int(rng.integers(2, 6)) # 2..5 per image
boxes = random_boxes(H, W, n_boxes, rng)
labels = rng.integers(1, 5, size=(n_boxes,), dtype=np.int64)
samples.append(Sample(image=img, boxes=boxes, labels=labels))
return samples
# ---------- Visualization helpers ----------
def draw_boxes_on_image(img_rgb, boxes, labels):
"""
Returns a copy of img_rgb with red rectangles and small label badges.
PIL is used for convenience; cv2 would work as well.
"""
im = Image.fromarray(img_rgb.copy())
dr = ImageDraw.Draw(im)
try:
font = ImageFont.load_default()
except Exception:
font = None
for i, box in enumerate(boxes):
x1, y1, x2, y2 = [int(v) for v in box.tolist()]
# Draw rectangle
dr.rectangle([x1, y1, x2, y2], outline=(255, 0, 0), width=2)
# A tiny label "badge" near the top-left corner
txt = str(int(labels[i]))
try:
tw = int(dr.textlength(txt, font=font)) # width in pixels
except Exception:
tw = 8 * len(txt)
dr.rectangle([x1, max(0, y1 - 12), x1 + tw + 6, y1], fill=(255, 0, 0))
dr.text((x1 + 3, max(0, y1 - 12)), txt, fill=(255, 255, 255), font=font)
return np.array(im)
def colorize_instance_masks(masks, H, W):
"""
Assign a distinct random color to each instance mask and composite into
a single HxWx3 overlay. Non-overlapping rectangles will show distinct
colors; overlapping regions take the last written color.
"""
N = masks.shape[0]
if N == 0:
return np.zeros((H, W, 3), dtype=np.uint8)
# Deterministic color palette for repeatability
rng = np.random.default_rng(1234)
colors = rng.integers(64, 255, size=(N, 3), dtype=np.uint8)
overlay = np.zeros((H, W, 3), dtype=np.uint8)
for i in range(N):
m = masks[i].astype(bool) # to boolean for indexing
overlay[m] = colors[i] # broadcast color into masked pixels
return overlay
def blend(a, b, alpha=0.5):
"""
Alpha-blend two RGB images (uint8). 'a' is usually the original,
'b' the color overlay. alpha in [0,1]: 0 -> only b, 1 -> only a.
"""
return (alpha * a.astype(np.float32) + (1 - alpha) * b.astype(np.float32)).clip(0, 255).astype(np.uint8)
def blend_masked(img, overlay, alpha=0.5):
"""
Overlay only on masked pixels; keep background identical.
img, overlay: HxWx3 uint8. overlay is 0 outside mask.
"""
out = img.copy()
m = overlay.any(axis=2) # HxW boolean mask: True where any color channel > 0
# alpha-blend only masked pixels
out[m] = (img[m] * (1 - alpha) + overlay[m] * alpha).clip(0, 255).astype(np.uint8)
return out
def side_by_side(left, right):
"""Horizontally concatenate two same-height images."""
assert left.shape[0] == right.shape[0]
return np.concatenate([left, right], axis=1)
# ---------- Main: generate, mask, and plot ----------
if __name__ == "__main__":
# Image size and how many samples to create
H, W, B = 256, 256, 3
# 1) Make synthetic samples (images + boxes + labels)
samples = make_synthetic_samples(B=B, H=H, W=W, seed=77)
# 2) Build per-box masks with Numba (first call compiles if Numba is enabled)
for s in samples:
# rect_masks_numba returns [N, H, W] uint8 masks (0/1)
s.masks = rect_masks_numba(s.boxes.astype(np.float32), H, W)
# 3) Visualize each sample:
# Left: image with red rectangles
# Right: same image with colored mask overlay
for i, s in enumerate(samples):
left = draw_boxes_on_image(s.image, s.boxes, s.labels)
overlay = colorize_instance_masks(s.masks, H, W)
right = blend(s.image, overlay, alpha=0.5)
right = blend_masked(s.image, overlay, alpha=0.5)
both = side_by_side(left, right)
# One figure per sample (keeps UI simple and readable)
plt.figure(figsize=(10, 5))
plt.axis("off")
plt.title(f"Sample {i} — left: boxes, right: mask overlay")
plt.imshow(both)
# Render all figures at the end
plt.show()
In [6]:
# numba_rect_mask_demo.py
# pip install numba numpy matplotlib pillow
# ------------------------------------------------------------
# Synthetic images + labels/bboxes + Numba rectangle masks,
# with side-by-side visualization using matplotlib.
#
# Quick start (CPU only):
# pip install numba numpy matplotlib pillow
# python numba_rect_mask_demo.py
#
# Notes:
# - Numba JIT-compiles the rectangles→mask routine on first call; that's why
# the first run is slower than subsequent runs.
# - This demo shows the original image on the LEFT, and on the RIGHT keeps
# ONLY the pixels inside any bbox (everything else is set to 0).
# ------------------------------------------------------------
from dataclasses import dataclass
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw, ImageFont
# ---------- Optional acceleration with Numba ----------
# If Numba isn't available, we fall back to a pure-NumPy kernel.
try:
from numba import njit, prange
USE_NUMBA = True
except Exception as e:
USE_NUMBA = False
print("Numba not available, falling back to pure NumPy. Error:", e)
# ---------- Rectangles → per-instance masks (per image) ----------
# Turns N axis-aligned boxes into N binary masks (HxW).
if USE_NUMBA:
@njit(parallel=True, fastmath=False)
def rect_masks_numba(boxes, H, W):
"""
boxes: float32/float64 [N,4] in (x1, y1, x2, y2)
returns: uint8 masks [N,H,W] with values {0,1}
"""
N = boxes.shape[0]
out = np.zeros((N, H, W), dtype=np.uint8)
for i in prange(N): # parallelize over boxes
x1 = int(boxes[i, 0]); y1 = int(boxes[i, 1])
x2 = int(boxes[i, 2]); y2 = int(boxes[i, 3])
# order & clamp
if x2 < x1: x1, x2 = x2, x1
if y2 < y1: y1, y2 = y2, y1
if x1 < 0: x1 = 0
if y1 < 0: y1 = 0
if x2 > W: x2 = W
if y2 > H: y2 = H
if x2 > x1 and y2 > y1:
out[i, y1:y2, x1:x2] = 1
return out
else:
def rect_masks_numba(boxes, H, W):
N = boxes.shape[0]
out = np.zeros((N, H, W), dtype=np.uint8)
for i in range(N):
x1 = int(boxes[i, 0]); y1 = int(boxes[i, 1])
x2 = int(boxes[i, 2]); y2 = int(boxes[i, 3])
if x2 < x1: x1, x2 = x2, x1
if y2 < y1: y1, y2 = y2, y1
x1 = max(0, min(W, x1)); x2 = max(0, min(W, x2))
y1 = max(0, min(H, y1)); y2 = max(0, min(H, y2))
if x2 > x1 and y2 > y1:
out[i, y1:y2, x1:x2] = 1
return out
# ---------- Data container ----------
@dataclass
class Sample:
image: np.ndarray # [H,W,3] uint8 (RGB)
boxes: np.ndarray # [N,4] float32 (x1,y1,x2,y2)
labels: np.ndarray # [N] int64
masks: np.ndarray = None # [N,H,W] uint8 (filled later)
# ---------- Random box generator ----------
def random_boxes(H, W, n_boxes, rng):
"""Produce n_boxes rectangles inside the image with minimum sizes."""
boxes = np.zeros((n_boxes, 4), dtype=np.float32)
for i in range(n_boxes):
x1 = rng.integers(0, W - 1)
y1 = rng.integers(0, H - 1)
bw = int(rng.integers(max(4, W // 16), max(8, W // 3)))
bh = int(rng.integers(max(4, H // 16), max(8, H // 3)))
x2 = min(W, x1 + bw)
y2 = min(H, y1 + bh)
boxes[i] = (min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
return boxes
# ---------- Synthetic sample maker ----------
def make_synthetic_samples(B=3, H=256, W=256, seed=7):
"""
Create B samples with gradient+noise background, 2..5 random boxes each,
and labels in {1,2,3,4}.
"""
rng = np.random.default_rng(seed)
samples = []
for _ in range(B):
gy = np.linspace(0, 1, H, dtype=np.float32)[:, None]
gx = np.linspace(0, 1, W, dtype=np.float32)[None, :]
base = 0.6 * gy + 0.4 * gx + 0.05 * rng.standard_normal((H, W)).astype(np.float32)
base = np.clip(base, 0, 1)
img = (255 * np.stack([base, np.roll(base, 12, axis=1), np.roll(base, 24, axis=0)], axis=-1)).astype(np.uint8)
n_boxes = int(rng.integers(2, 6)) # 2..5
boxes = random_boxes(H, W, n_boxes, rng)
labels = rng.integers(1, 5, size=(n_boxes,), dtype=np.int64)
samples.append(Sample(image=img, boxes=boxes, labels=labels))
return samples
# ---------- Drawing helpers ----------
def draw_boxes_on_image(img_rgb, boxes, labels):
"""Return a copy of img_rgb with red rectangles and small label badges."""
im = Image.fromarray(img_rgb.copy())
dr = ImageDraw.Draw(im)
try:
font = ImageFont.load_default()
except Exception:
font = None
for i, box in enumerate(boxes):
x1, y1, x2, y2 = [int(v) for v in box.tolist()]
dr.rectangle([x1, y1, x2, y2], outline=(255, 0, 0), width=2)
txt = str(int(labels[i]))
try:
tw = int(dr.textlength(txt, font=font))
except Exception:
tw = 8 * len(txt)
dr.rectangle([x1, max(0, y1 - 12), x1 + tw + 6, y1], fill=(255, 0, 0))
dr.text((x1 + 3, max(0, y1 - 12)), txt, fill=(255, 255, 255), font=font)
return np.array(im)
def apply_union_mask(img, masks):
"""
Keep only pixels inside ANY bbox; zero out everything else.
img: [H,W,3] uint8
masks: [N,H,W] uint8 (0/1)
returns: [H,W,3] uint8
"""
if masks.size == 0:
return np.zeros_like(img)
m = (masks != 0).any(axis=0).astype(np.uint8) # [H,W] 0/1 union of masks
return (img * m[..., None]).astype(np.uint8)
def side_by_side(left, right):
"""Horizontally concatenate two same-height images."""
assert left.shape[0] == right.shape[0]
return np.concatenate([left, right], axis=1)
# ---------- Main ----------
if __name__ == "__main__":
H, W, B = 256, 256, 3
# 1) Make synthetic samples (images + boxes + labels)
samples = make_synthetic_samples(B=B, H=H, W=W, seed=77)
# 2) Build per-box masks with Numba (first call compiles if Numba is enabled)
for s in samples:
s.masks = rect_masks_numba(s.boxes.astype(np.float32), H, W) # [N,H,W] uint8 (0/1)
# 3) Visualize each sample:
# Left: image with red rectangles
# Right: ONLY bboxed regions visible, everything else set to 0
for i, s in enumerate(samples):
left = draw_boxes_on_image(s.image, s.boxes, s.labels)
right = apply_union_mask(s.image, s.masks)
both = side_by_side(left, right)
plt.figure(figsize=(10, 5))
plt.axis("off")
plt.title(f"Sample {i} — left: boxes, right: masked (outside=0)")
plt.imshow(both)
plt.show()
In [7]:
# numba_rect_mask_demo_binary.py
# pip install numba numpy matplotlib pillow
# ------------------------------------------------------------
# Synthetic images + labels/bboxes + Numba rectangle masks.
# LEFT = original image with red rectangle outlines
# RIGHT = binary union mask (strict 0/1; shown as 0/255 for viewing)
#
# Quick start (CPU only):
# pip install numba numpy matplotlib pillow
# python numba_rect_mask_demo_binary.py
# ------------------------------------------------------------
from dataclasses import dataclass
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw, ImageFont
# ---------- Optional acceleration with Numba ----------
try:
from numba import njit, prange
USE_NUMBA = True
except Exception as e:
USE_NUMBA = False
print("Numba not available, falling back to pure NumPy. Error:", e)
# ---------- Rectangles → per-instance masks (strict 0/1) ----------
if USE_NUMBA:
@njit(parallel=True, fastmath=False)
def rect_masks_numba(boxes, H, W):
"""
boxes: float32/float64 [N,4] in (x1, y1, x2, y2)
returns: uint8 masks [N,H,W] with values {0,1}
"""
N = boxes.shape[0]
out = np.zeros((N, H, W), dtype=np.uint8)
for i in prange(N): # parallelize over boxes
x1 = int(boxes[i, 0]); y1 = int(boxes[i, 1])
x2 = int(boxes[i, 2]); y2 = int(boxes[i, 3])
# order & clamp
if x2 < x1: x1, x2 = x2, x1
if y2 < y1: y1, y2 = y2, y1
if x1 < 0: x1 = 0
if y1 < 0: y1 = 0
if x2 > W: x2 = W
if y2 > H: y2 = H
if x2 > x1 and y2 > y1:
out[i, y1:y2, x1:x2] = 1
return out
else:
def rect_masks_numba(boxes, H, W):
N = boxes.shape[0]
out = np.zeros((N, H, W), dtype=np.uint8)
for i in range(N):
x1 = int(boxes[i, 0]); y1 = int(boxes[i, 1])
x2 = int(boxes[i, 2]); y2 = int(boxes[i, 3])
if x2 < x1: x1, x2 = x2, x1
if y2 < y1: y1, y2 = y2, y1
x1 = max(0, min(W, x1)); x2 = max(0, min(W, x2))
y1 = max(0, min(H, y1)); y2 = max(0, min(H, y2))
if x2 > x1 and y2 > y1:
out[i, y1:y2, x1:x2] = 1
return out
# ---------- Data container ----------
@dataclass
class Sample:
image: np.ndarray # [H,W,3] uint8 (RGB)
boxes: np.ndarray # [N,4] float32 (x1,y1,x2,y2)
labels: np.ndarray # [N] int64
masks: np.ndarray = None # [N,H,W] uint8 in {0,1}
# ---------- Random box generator ----------
def random_boxes(H, W, n_boxes, rng):
boxes = np.zeros((n_boxes, 4), dtype=np.float32)
for i in range(n_boxes):
x1 = rng.integers(0, W - 1)
y1 = rng.integers(0, H - 1)
bw = int(rng.integers(max(4, W // 16), max(8, W // 3)))
bh = int(rng.integers(max(4, H // 16), max(8, H // 3)))
x2 = min(W, x1 + bw)
y2 = min(H, y1 + bh)
boxes[i] = (min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
return boxes
# ---------- Synthetic sample maker ----------
def make_synthetic_samples(B=3, H=256, W=256, seed=7):
rng = np.random.default_rng(seed)
samples = []
for _ in range(B):
gy = np.linspace(0, 1, H, dtype=np.float32)[:, None]
gx = np.linspace(0, 1, W, dtype=np.float32)[None, :]
base = 0.6 * gy + 0.4 * gx + 0.05 * rng.standard_normal((H, W)).astype(np.float32)
base = np.clip(base, 0, 1)
img = (255 * np.stack([base, np.roll(base, 12, axis=1), np.roll(base, 24, axis=0)], axis=-1)).astype(np.uint8)
n_boxes = int(rng.integers(2, 6)) # 2..5
boxes = random_boxes(H, W, n_boxes, rng)
labels = rng.integers(1, 5, size=(n_boxes,), dtype=np.int64)
samples.append(Sample(image=img, boxes=boxes, labels=labels))
return samples
# ---------- Drawing helpers ----------
def draw_boxes_on_image(img_rgb, boxes, labels):
im = Image.fromarray(img_rgb.copy())
dr = ImageDraw.Draw(im)
try:
font = ImageFont.load_default()
except Exception:
font = None
for i, box in enumerate(boxes):
x1, y1, x2, y2 = [int(v) for v in box.tolist()]
dr.rectangle([x1, y1, x2, y2], outline=(255, 0, 0), width=2)
txt = str(int(labels[i]))
try:
tw = int(dr.textlength(txt, font=font))
except Exception:
tw = 8 * len(txt)
dr.rectangle([x1, max(0, y1 - 12), x1 + tw + 6, y1], fill=(255, 0, 0))
dr.text((x1 + 3, max(0, y1 - 12)), txt, fill=(255, 255, 255), font=font)
return np.array(im)
# ---------- Binary mask utilities ----------
def union_mask_binary(masks):
"""Combine [N,H,W] 0/1 masks into a single 0/1 mask [H,W]."""
if masks.size == 0:
return None
return (masks != 0).any(axis=0).astype(np.uint8) # 0 or 1
def binary_to_rgb(mask_01):
"""Show a 0/1 mask as 0/255 grayscale RGB for plotting."""
vis = (mask_01 * 255).astype(np.uint8) # [H,W] 0/255
return np.repeat(vis[..., None], 3, axis=2) # [H,W,3]
def side_by_side(left, right):
assert left.shape[0] == right.shape[0]
return np.concatenate([left, right], axis=1)
# ---------- Main ----------
if __name__ == "__main__":
H, W, B = 256, 256, 3
# 1) Make synthetic samples (images + boxes + labels)
samples = make_synthetic_samples(B=B, H=H, W=W, seed=77)
# 2) Build per-box masks (strict 0/1)
for s in samples:
s.masks = rect_masks_numba(s.boxes.astype(np.float32), H, W) # [N,H,W] uint8 in {0,1}
# 3) Visualize:
# LEFT = original with red rectangles
# RIGHT = binary union mask (0/1, shown as 0/255)
for i, s in enumerate(samples):
left = draw_boxes_on_image(s.image, s.boxes, s.labels)
union_01 = union_mask_binary(s.masks) # [H,W] 0/1
right = binary_to_rgb(union_01) # [H,W,3] for side-by-side
both = side_by_side(left, right)
plt.figure(figsize=(10, 5))
plt.axis("off")
plt.title(f"Sample {i} — left: boxes, right: binary mask (0/1)")
plt.imshow(both)
plt.show()
Polygon with OpenCV¶
In [ ]:
In [8]:
# pip install numpy matplotlib pillow opencv-python
# poly_mask_demo.py
# ------------------------------------------------------------
# Synthetic images + labels/polygons + OpenCV polygon masks,
# with side-by-side visualization using matplotlib.
# LEFT = original image with red polygon outlines
# RIGHT = only pixels inside any polygon (outside set to 0)
# ------------------------------------------------------------
from dataclasses import dataclass
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw, ImageFont
import cv2
# ---------- Data container ----------
@dataclass
class SamplePoly:
image: np.ndarray # [H,W,3] uint8 (RGB)
polygons: list # list of (Mi,2) int32 arrays
labels: np.ndarray # [N] int64
masks: np.ndarray = None # [N,H,W] uint8, filled later
# ---------- Polygon generation ----------
def random_convex_polygon(H, W, rng, n_vertices=5, min_r=None, max_r=None):
"""
Make a random convex-ish polygon by sampling radii on sorted angles.
Returns (n_vertices, 2) int32 array clipped to image bounds.
"""
if min_r is None: min_r = max(8, min(H, W) // 10)
if max_r is None: max_r = max(min(H, W) // 4, min_r + 1)
# center away from borders
cx = int(rng.integers(min_r, W - min_r))
cy = int(rng.integers(min_r, H - min_r))
angles = np.sort(rng.uniform(0, 2 * np.pi, size=n_vertices))
radii = rng.uniform(min_r, max_r, size=n_vertices)
xs = cx + (radii * np.cos(angles))
ys = cy + (radii * np.sin(angles))
pts = np.stack([xs, ys], axis=1)
# clip & to int32 for OpenCV
pts[:, 0] = np.clip(pts[:, 0], 0, W - 1)
pts[:, 1] = np.clip(pts[:, 1], 0, H - 1)
return pts.astype(np.int32)
def make_synthetic_poly_samples(B=3, H=256, W=256, seed=7):
"""
Build B samples with gradient+noise images and 2..5 random polygons each.
"""
rng = np.random.default_rng(seed)
samples = []
for _ in range(B):
# colorful gradient + noise background
gy = np.linspace(0, 1, H, dtype=np.float32)[:, None]
gx = np.linspace(0, 1, W, dtype=np.float32)[None, :]
base = 0.6 * gy + 0.4 * gx + 0.05 * rng.standard_normal((H, W)).astype(np.float32)
base = np.clip(base, 0, 1)
img = (255 * np.stack([base, np.roll(base, 10, 1), np.roll(base, 20, 0)], axis=-1)).astype(np.uint8)
n_polys = int(rng.integers(2, 6)) # 2..5
polys = []
for _p in range(n_polys):
k = int(rng.integers(3, 8)) # 3..7 vertices
polys.append(random_convex_polygon(H, W, rng, n_vertices=k))
labels = rng.integers(1, 5, size=(n_polys,), dtype=np.int64)
samples.append(SamplePoly(image=img, polygons=polys, labels=labels))
return samples
# ---------- Polygons → per-instance masks (OpenCV) ----------
def polygons_to_masks_cv(polygons, H, W):
"""
polygons: list of (Mi,2) int32 arrays
returns: uint8 masks [N,H,W] with 0/1
"""
N = len(polygons)
masks = np.zeros((N, H, W), dtype=np.uint8)
for i, poly in enumerate(polygons):
# Ensure the array is int32 for cv2 and shape (Mi,1,2) or (Mi,2)
poly_i = np.asarray(poly, dtype=np.int32)
cv2.fillPoly(masks[i], [poly_i], 1) # draw into the i-th 2D mask
return masks
# ---------- Visualization ----------
def draw_polygons_on_image(img_rgb, polygons, labels):
"""Return a copy of the image with red polygon outlines + tiny label badges."""
im = Image.fromarray(img_rgb.copy())
dr = ImageDraw.Draw(im)
try:
font = ImageFont.load_default()
except Exception:
font = None
for i, poly in enumerate(polygons):
pts = [(int(x), int(y)) for x, y in poly]
dr.polygon(pts, outline=(255, 0, 0))
# label near the first vertex
x1, y1 = pts[0]
txt = str(int(labels[i]))
try:
tw = int(dr.textlength(txt, font=font))
except Exception:
tw = 8 * len(txt)
dr.rectangle([x1, max(0, y1 - 12), x1 + tw + 6, y1], fill=(255, 0, 0))
dr.text((x1 + 3, max(0, y1 - 12)), txt, fill=(255, 255, 255), font=font)
return np.array(im)
def apply_union_mask(img, masks):
"""
Keep only pixels inside ANY polygon; zero out everything else.
img: [H,W,3] uint8
masks: [N,H,W] uint8 (0/1)
"""
if masks.size == 0:
return np.zeros_like(img)
m = (masks != 0).any(axis=0).astype(np.uint8) # union across instances
return (img * m[..., None]).astype(np.uint8)
def side_by_side(left, right):
assert left.shape[0] == right.shape[0]
return np.concatenate([left, right], axis=1)
# ---------- Main ----------
if __name__ == "__main__":
H, W, B = 256, 256, 3
samples = make_synthetic_poly_samples(B=B, H=H, W=W, seed=77)
# Rasterize per-polygon masks with OpenCV
for s in samples:
s.masks = polygons_to_masks_cv(s.polygons, H, W) # [N,H,W] uint8
# Show: left = outlines, right = only polygon regions visible
for i, s in enumerate(samples):
left = draw_polygons_on_image(s.image, s.polygons, s.labels)
right = apply_union_mask(s.image, s.masks)
both = side_by_side(left, right)
plt.figure(figsize=(10, 5))
plt.axis("off")
plt.title(f"Sample {i} — left: polygons, right: masked (outside=0)")
plt.imshow(both)
plt.show()
In [9]:
# pip install numpy matplotlib pillow opencv-python
# poly_mask_demo_binary.py
# ------------------------------------------------------------
# Synthetic images + labels/polygons + OpenCV polygon masks.
# LEFT = original image with red polygon outlines
# RIGHT = binary union mask (strict 0/1; shown as 0/255 for viewing)
# ------------------------------------------------------------
from dataclasses import dataclass
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw, ImageFont
import cv2
# ---------- Data container ----------
@dataclass
class SamplePoly:
image: np.ndarray # [H,W,3] uint8 (RGB)
polygons: list # list of (Mi,2) int32 arrays
labels: np.ndarray # [N] int64
masks: np.ndarray = None # [N,H,W] uint8 in {0,1}, filled later
# ---------- Polygon generation ----------
def random_convex_polygon(H, W, rng, n_vertices=5, min_r=None, max_r=None):
"""Return (n_vertices, 2) int32 polygon within image bounds."""
if min_r is None: min_r = max(8, min(H, W) // 10)
if max_r is None: max_r = max(min(H, W) // 4, min_r + 1)
cx = int(rng.integers(min_r, W - min_r))
cy = int(rng.integers(min_r, H - min_r))
angles = np.sort(rng.uniform(0, 2 * np.pi, size=n_vertices))
radii = rng.uniform(min_r, max_r, size=n_vertices)
xs = cx + (radii * np.cos(angles))
ys = cy + (radii * np.sin(angles))
pts = np.stack([xs, ys], axis=1)
pts[:, 0] = np.clip(pts[:, 0], 0, W - 1)
pts[:, 1] = np.clip(pts[:, 1], 0, H - 1)
return pts.astype(np.int32)
def make_synthetic_poly_samples(B=3, H=256, W=256, seed=7):
"""Build B samples with gradient+noise images and 2..5 random polygons each."""
rng = np.random.default_rng(seed)
samples = []
for _ in range(B):
# colorful gradient + noise background
gy = np.linspace(0, 1, H, dtype=np.float32)[:, None]
gx = np.linspace(0, 1, W, dtype=np.float32)[None, :]
base = 0.6 * gy + 0.4 * gx + 0.05 * rng.standard_normal((H, W)).astype(np.float32)
base = np.clip(base, 0, 1)
img = (255 * np.stack([base, np.roll(base, 10, 1), np.roll(base, 20, 0)], axis=-1)).astype(np.uint8)
n_polys = int(rng.integers(2, 6)) # 2..5
polys = []
for _p in range(n_polys):
k = int(rng.integers(3, 8)) # 3..7 vertices
polys.append(random_convex_polygon(H, W, rng, n_vertices=k))
labels = rng.integers(1, 5, size=(n_polys,), dtype=np.int64)
samples.append(SamplePoly(image=img, polygons=polys, labels=labels))
return samples
# ---------- Polygons → per-instance masks (strict 0/1) ----------
def polygons_to_masks_cv(polygons, H, W):
"""
polygons: list[(Mi,2) int32]
returns: masks uint8 [N,H,W] with values {0,1}
"""
N = len(polygons)
masks = np.zeros((N, H, W), dtype=np.uint8)
for i, poly in enumerate(polygons):
poly_i = np.asarray(poly, dtype=np.int32)
# Fill with value 1 (NOT 255) so masks are 0/1, dtype uint8
cv2.fillPoly(masks[i], [poly_i], 1)
return masks
# ---------- Visualization ----------
def draw_polygons_on_image(img_rgb, polygons, labels):
"""Return a copy with red polygon outlines + small label badges."""
im = Image.fromarray(img_rgb.copy())
dr = ImageDraw.Draw(im)
try:
font = ImageFont.load_default()
except Exception:
font = None
for i, poly in enumerate(polygons):
pts = [(int(x), int(y)) for x, y in poly]
dr.polygon(pts, outline=(255, 0, 0))
x1, y1 = pts[0]
txt = str(int(labels[i]))
try:
tw = int(dr.textlength(txt, font=font))
except Exception:
tw = 8 * len(txt)
dr.rectangle([x1, max(0, y1 - 12), x1 + tw + 6, y1], fill=(255, 0, 0))
dr.text((x1 + 3, max(0, y1 - 12)), txt, fill=(255, 255, 255), font=font)
return np.array(im)
def union_mask_binary(masks):
"""Combine [N,H,W] 0/1 masks into a single 0/1 mask [H,W]."""
if masks.size == 0:
return None
return (masks != 0).any(axis=0).astype(np.uint8) # 0 or 1
def binary_to_rgb(mask_01):
"""Visualize a 0/1 mask as 0/255 RGB so it can be concatenated with the photo."""
vis = (mask_01 * 255).astype(np.uint8) # [H,W] 0/255
return np.repeat(vis[..., None], 3, axis=2) # [H,W,3]
def side_by_side(left, right):
assert left.shape[0] == right.shape[0]
return np.concatenate([left, right], axis=1)
# ---------- Main ----------
if __name__ == "__main__":
H, W, B = 256, 256, 3
samples = make_synthetic_poly_samples(B=B, H=H, W=W, seed=77)
# Per-polygon 0/1 masks
for s in samples:
s.masks = polygons_to_masks_cv(s.polygons, H, W) # [N,H,W] uint8 in {0,1}
# Show: left = outlines, right = **binary** union mask (0/1 shown as 0/255)
for i, s in enumerate(samples):
left = draw_polygons_on_image(s.image, s.polygons, s.labels)
union_01 = union_mask_binary(s.masks) # [H,W] 0/1
right = binary_to_rgb(union_01) # [H,W,3] for side-by-side
both = side_by_side(left, right)
plt.figure(figsize=(10, 5))
plt.axis("off")
plt.title(f"Sample {i} — left: polygons, right: binary mask (0/1)")
plt.imshow(both)
plt.show()
Solution To "Kernel Restarting" due to matplotlib¶
In [10]:
# -----------------------------------------------------------------------------
# TEMPORARY WORKAROUND for Intel OpenMP "duplicate library" errors
# (e.g., messages about KMP or 'libiomp' being loaded multiple times).
#
# What this does:
# - Tells Intel's OpenMP runtime to IGNORE the fact that two OpenMP runtimes
# were loaded into the same Python process (often happens when mixing
# conda/pip wheels that bundle different OpenMPs via MKL, NumPy, PyTorch, etc.).
# - Lets the process continue instead of crashing at import time.
#
# IMPORTANT:
# - Use ONLY for quick evaluation/debugging. It can lead to instability,
# hangs, or wrong performance characteristics. Do NOT use in production
# training/inference scripts.
# - Set this BEFORE importing numpy/torch/sklearn (the libraries that load OMP).
# -----------------------------------------------------------------------------
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" # ⚠️ temporary, evaluation-only
In [11]:
# After matplotlib.plot
os.environ["KMP_DUPLICATE_LIB_OK"] = "FALSE"
Find image files in a folder¶
In [14]:
import os
folder = "images" #Update
# Define image extensions you care about
image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp')
# Get only files (not subfolders) that end with those extensions
image_files = [f for f in os.listdir(folder)
if f.lower().endswith(image_exts) and os.path.isfile(os.path.join(folder, f))]
print(image_files)
['0007x4.png']
In [15]:
import os
def get_file_names(folder, image_exts = None, prnt = True):
if image_exts is None:
# Define image extensions you care about
image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp')
# Get only files (not subfolders) that end with those extensions
image_files = [f for f in os.listdir(folder)
if f.lower().endswith(image_exts) and os.path.isfile(os.path.join(folder, f))]
if prnt:
print(image_files)
return image_files
In [16]:
import os
def get_file_names_subfolder(folder, image_exts=None, prnt=True):
if image_exts is None:
image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp')
image_files = []
# Walk through all subdirectories
for root, _, files in os.walk(folder):
for f in files:
if f.lower().endswith(image_exts):
image_files.append(os.path.join(root, f))
if prnt:
for f in image_files:
print(f)
return image_files
Set or Change your working directory to wherever your jupyter notebook is currently located.¶
In [17]:
import os
from pathlib import Path
# Get the current notebook's directory
workdir = Path().resolve()
os.chdir(workdir)
print("Working directory set to:", os.getcwd())
Working directory set to: C:\Users\kec994\OneDrive - The University of Texas-Rio Grande Valley\PhD\Data_Source\pylearn
End¶
In [ ]: